import random from typing import List, Optional from pydantic import BaseModel from worker.celery.app import app from worker.search_engine.drission_google_search import search_keyword_drission from mylib.logu import logger import sys import asyncio import httpx from utils.proxy_pool import get_random_proxy from config.settings import PROXIES from worker.search_engine.search_result_db import SearchResultManager, KeywordTask, SearchResultItem, SearchPageResult from sqlmodel import select, Session, exists, distinct from celery import group import redis # redis_client = redis.Redis(host='127.0.0.1', port=6379, db=1) class SearchTaskConfig(BaseModel): max_result_items: Optional[int] = 200 skip_existing: Optional[bool] = True browser_config: Optional[dict] = {} proxy_pool_url: Optional[str] = None dry_run: Optional[bool] = True class SearchTaskInput(SearchTaskConfig): keyword: str config: Optional[SearchTaskConfig] = None @app.task( name='search_worker.search_all_uncompleted_keywords', serializer='pickle', accept=['pickle', 'json'] ) def search_all_uncompleted_keywords_task(config: dict|SearchTaskConfig): """异步任务:搜索所有未完成的关键词""" config = SearchTaskConfig(**config) try: manager = SearchResultManager() uncompleted_keywords = manager.get_uncompleted_keywords() if not uncompleted_keywords: logger.info("没有未完成的关键词需要搜索。") return {"status": "success", "message": "没有未完成的关键词需要搜索。"} logger.info(f"找到 {len(uncompleted_keywords)} 个未完成的关键词,开始批量搜索...") task_group = group([ drission_search_task.s( SearchTaskInput( keyword=keyword, config=config ).model_dump() ).set(queue='search_queue') for keyword in uncompleted_keywords ]) # 执行任务组 result = task_group.apply_async() return {"status": "success", "task_id": result.id, "message": f"已启动 {len(uncompleted_keywords)} 个关键词搜索任务。"} except Exception as e: logger.error(f"批量搜索任务失败: {str(e)}") raise @app.task(name='search_worker.drission_search') def drission_search_task(task_input: SearchTaskInput): """异步关键词搜索任务 Args: task_input: 包含keyword和config的字典参数 """ if sys.platform == 'win32': asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) # 统一配置解析 input_model = SearchTaskInput(**task_input) config = input_model.config or SearchTaskConfig() async def _execute_search(): try: # 使用配置模型的参数 logger.info(f"开始处理关键词搜索任务{"( dry_run 模式)" if config.dry_run else ""}: {input_model.keyword}") if config.proxy_pool_url: proxy = await asyncio.to_thread(get_random_proxy, config.proxy_pool_url) config.browser_config['proxy'] = proxy logger.info(f"使用代理池: {config.proxy_pool_url} --> {proxy}") else: logger.info(f"使用代理: 跟随系统") logger.info(f"任务配置: {config.model_dump()}") if config.dry_run: await asyncio.sleep(3) result = [] else: result = await search_keyword_drission( input_model.keyword, max_result_items=config.max_result_items, skip_existing=config.skip_existing, browser_config=config.browser_config) ret = {"keyword": input_model.keyword, "result": result} logger.info(f"关键词搜索任务完成: {ret}") return ret except Exception as e: logger.error(f"关键词搜索任务失败: {str(e)}") raise loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(_execute_search()) finally: loop.close() asyncio.set_event_loop_policy(None) def main(): proxy = get_random_proxy() if __name__ == "__main__": main()