| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- import random
- from typing import List
- from worker.celery.app import app
- from worker.search_engine.drission_google_search import search_keyword_drission
- from mylib.logu import logger
- import sys
- import asyncio
- import httpx
- from utils.proxy_pool import get_random_proxy
- from config.settings import PROXIES
- from worker.search_engine.search_result_db import SearchResultManager, KeywordTask, SearchResultItem, SearchPageResult
- from sqlmodel import select, Session, exists, distinct
- from celery import group
- import redis
- # redis_client = redis.Redis(host='127.0.0.1', port=6379, db=1)
- @app.task(name='search_worker.search_all_uncompleted_keywords')
- def search_all_uncompleted_keywords_task(max_result_items: int = 200, skip_existing: bool = True, browser_config: dict = {}, proxy_pool: List[str] = None, dry_run:bool=False):
- """异步任务:搜索所有未完成的关键词"""
- try:
- # redis_client.delete('search_queue')
- # logger.info(f"删除旧有search_queue队列")
- # 获取所有未完成的关键词
- manager = SearchResultManager()
- uncompleted_keywords = manager.get_uncompleted_keywords()
-
- if not uncompleted_keywords:
- logger.info("没有未完成的关键词需要搜索。")
- return {"status": "success", "message": "没有未完成的关键词需要搜索。"}
-
- logger.info(f"找到 {len(uncompleted_keywords)} 个未完成的关键词,开始批量搜索...")
-
- # 创建任务组,每个关键词对应一个 drission_search_task
- task_group = group([
- drission_search_task.s(
- keyword, max_result_items, skip_existing, browser_config, proxy_pool, dry_run
- ).set(queue='search_queue')
- for keyword in uncompleted_keywords
- ])
-
- # 执行任务组
- result = task_group.apply_async()
-
- return {"status": "success", "task_id": result.id, "message": f"已启动 {len(uncompleted_keywords)} 个关键词搜索任务。"}
- except Exception as e:
- logger.error(f"批量搜索任务失败: {str(e)}")
- raise
- @app.task(name='search_worker.drission_search')
- def drission_search_task(keyword: str, max_result_items: int=200, skip_existing: bool=True, browser_config: dict={}, proxy_pool:list[str]=None, dry_run:bool=False):
- """异步关键词搜索任务"""
- if sys.platform == 'win32':
- asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
- PROXIES = proxy_pool if proxy_pool else None
- async def _execute_search():
- try:
- if proxy_pool:
- browser_config.update({'proxy': get_random_proxy()})
- logger.info(f"browser_config: {browser_config}")
- logger.info(f"开始处理关键词搜索任务: {keyword}")
- if dry_run:
- await asyncio.sleep(3)
- result = []
- else:
- result = await search_keyword_drission(
- keyword, max_result_items=max_result_items, skip_existing=skip_existing, browser_config=browser_config)
- return {"keyword": keyword, "result": result}
- except Exception as e:
- logger.error(f"关键词搜索任务失败: {str(e)}")
- raise
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- try:
- return loop.run_until_complete(_execute_search())
- finally:
- loop.close()
- asyncio.set_event_loop_policy(None)
- def main():
- proxy = get_random_proxy()
- if __name__ == "__main__":
- main()
|