| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- import random
- from typing import List, Optional
- from pydantic import BaseModel
- from worker.celery.app import app
- from worker.search_engine.drission_google_search import search_keyword_drission
- from mylib.logu import logger
- import sys
- import asyncio
- import httpx
- from utils.proxy_pool import get_random_proxy
- from config.settings import PROXIES
- from worker.search_engine.search_result_db import SearchResultManager, KeywordTask, SearchResultItem, SearchPageResult
- from sqlmodel import select, Session, exists, distinct
- from celery import group
- import redis
- # redis_client = redis.Redis(host='127.0.0.1', port=6379, db=1)
- class SearchTaskConfig(BaseModel):
- max_result_items: Optional[int] = 200
- skip_existing: Optional[bool] = True
- browser_config: Optional[dict] = {}
- proxy_pool_url: Optional[str] = None
- dry_run: Optional[bool] = True
- class SearchTaskInput(SearchTaskConfig):
- keyword: str
- config: Optional[SearchTaskConfig] = None
- @app.task(
- name='search_worker.search_all_uncompleted_keywords',
- serializer='pickle',
- accept=['pickle', 'json']
- )
- def search_all_uncompleted_keywords_task(config: dict|SearchTaskConfig):
- """异步任务:搜索所有未完成的关键词"""
- config = SearchTaskConfig(**config)
- try:
- manager = SearchResultManager()
- uncompleted_keywords = manager.get_uncompleted_keywords()
-
- if not uncompleted_keywords:
- logger.info("没有未完成的关键词需要搜索。")
- return {"status": "success", "message": "没有未完成的关键词需要搜索。"}
-
- logger.info(f"找到 {len(uncompleted_keywords)} 个未完成的关键词,开始批量搜索...")
-
- task_group = group([
- drission_search_task.s(
- SearchTaskInput(
- keyword=keyword,
- config=config
- ).model_dump()
- ).set(queue='search_queue')
- for keyword in uncompleted_keywords
- ])
-
- # 执行任务组
- result = task_group.apply_async()
-
- return {"status": "success", "task_id": result.id, "message": f"已启动 {len(uncompleted_keywords)} 个关键词搜索任务。"}
- except Exception as e:
- logger.error(f"批量搜索任务失败: {str(e)}")
- raise
- @app.task(name='search_worker.drission_search')
- def drission_search_task(task_input: SearchTaskInput):
- """异步关键词搜索任务
- Args:
- task_input: 包含keyword和config的字典参数
- """
- if sys.platform == 'win32':
- asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
- # 统一配置解析
- input_model = SearchTaskInput(**task_input)
- config = input_model.config or SearchTaskConfig()
- async def _execute_search():
- try:
- # 使用配置模型的参数
- logger.info(f"开始处理关键词搜索任务{"( dry_run 模式)" if config.dry_run else ""}: {input_model.keyword}")
- if config.proxy_pool_url:
- proxy = await asyncio.to_thread(get_random_proxy, config.proxy_pool_url)
- config.browser_config['proxy'] = proxy
- logger.info(f"使用代理池: {config.proxy_pool_url} --> {proxy}")
- else:
- logger.info(f"使用代理: 跟随系统")
- logger.info(f"任务配置: {config.model_dump()}")
- if config.dry_run:
- await asyncio.sleep(3)
- result = []
- else:
- result = await search_keyword_drission(
- input_model.keyword,
- max_result_items=config.max_result_items,
- skip_existing=config.skip_existing,
- browser_config=config.browser_config)
- ret = {"keyword": input_model.keyword, "result": result}
- logger.info(f"关键词搜索任务完成: {ret}")
- return ret
- except Exception as e:
- logger.error(f"关键词搜索任务失败: {str(e)}")
- raise
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- try:
- return loop.run_until_complete(_execute_search())
- finally:
- loop.close()
- asyncio.set_event_loop_policy(None)
- def main():
- proxy = get_random_proxy()
- if __name__ == "__main__":
- main()
|