search_tasks.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. import random
  2. from typing import List, Optional
  3. from pydantic import BaseModel
  4. from worker.celery.app import app
  5. from worker.search_engine.drission_google_search import search_keyword_drission
  6. from mylib.logu import logger
  7. import sys
  8. import asyncio
  9. import httpx
  10. from utils.proxy_pool import get_random_proxy
  11. from config.settings import PROXIES
  12. from worker.search_engine.search_result_db import SearchResultManager, KeywordTask, SearchResultItem, SearchPageResult
  13. from sqlmodel import select, Session, exists, distinct
  14. from celery import group
  15. import redis
  16. # redis_client = redis.Redis(host='127.0.0.1', port=6379, db=1)
  17. class SearchTaskConfig(BaseModel):
  18. max_result_items: Optional[int] = 200
  19. skip_existing: Optional[bool] = True
  20. browser_config: Optional[dict] = {}
  21. proxy_pool_url: Optional[str] = None
  22. dry_run: Optional[bool] = True
  23. class SearchTaskInput(SearchTaskConfig):
  24. keyword: str
  25. config: Optional[SearchTaskConfig] = None
  26. @app.task(
  27. name='search_worker.search_all_uncompleted_keywords',
  28. serializer='pickle',
  29. accept=['pickle', 'json']
  30. )
  31. def search_all_uncompleted_keywords_task(config: dict|SearchTaskConfig):
  32. """异步任务:搜索所有未完成的关键词"""
  33. config = SearchTaskConfig(**config)
  34. try:
  35. manager = SearchResultManager()
  36. uncompleted_keywords = manager.get_uncompleted_keywords()
  37. if not uncompleted_keywords:
  38. logger.info("没有未完成的关键词需要搜索。")
  39. return {"status": "success", "message": "没有未完成的关键词需要搜索。"}
  40. logger.info(f"找到 {len(uncompleted_keywords)} 个未完成的关键词,开始批量搜索...")
  41. task_group = group([
  42. drission_search_task.s(
  43. SearchTaskInput(
  44. keyword=keyword,
  45. config=config
  46. ).model_dump()
  47. ).set(queue='search_queue')
  48. for keyword in uncompleted_keywords
  49. ])
  50. # 执行任务组
  51. result = task_group.apply_async()
  52. return {"status": "success", "task_id": result.id, "message": f"已启动 {len(uncompleted_keywords)} 个关键词搜索任务。"}
  53. except Exception as e:
  54. logger.error(f"批量搜索任务失败: {str(e)}")
  55. raise
  56. @app.task(name='search_worker.drission_search')
  57. def drission_search_task(task_input: SearchTaskInput):
  58. """异步关键词搜索任务
  59. Args:
  60. task_input: 包含keyword和config的字典参数
  61. """
  62. if sys.platform == 'win32':
  63. asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
  64. # 统一配置解析
  65. input_model = SearchTaskInput(**task_input)
  66. config = input_model.config or SearchTaskConfig()
  67. async def _execute_search():
  68. try:
  69. # 使用配置模型的参数
  70. logger.info(f"开始处理关键词搜索任务{"( dry_run 模式)" if config.dry_run else ""}: {input_model.keyword}")
  71. if config.proxy_pool_url:
  72. proxy = await asyncio.to_thread(get_random_proxy, config.proxy_pool_url)
  73. config.browser_config['proxy'] = proxy
  74. logger.info(f"使用代理池: {config.proxy_pool_url} --> {proxy}")
  75. else:
  76. logger.info(f"使用代理: 跟随系统")
  77. logger.info(f"任务配置: {config.model_dump()}")
  78. if config.dry_run:
  79. await asyncio.sleep(3)
  80. result = []
  81. else:
  82. result = await search_keyword_drission(
  83. input_model.keyword,
  84. max_result_items=config.max_result_items,
  85. skip_existing=config.skip_existing,
  86. browser_config=config.browser_config)
  87. ret = {"keyword": input_model.keyword, "result": result}
  88. logger.info(f"关键词搜索任务完成: {ret}")
  89. return ret
  90. except Exception as e:
  91. logger.error(f"关键词搜索任务失败: {str(e)}")
  92. raise
  93. loop = asyncio.new_event_loop()
  94. asyncio.set_event_loop(loop)
  95. try:
  96. return loop.run_until_complete(_execute_search())
  97. finally:
  98. loop.close()
  99. asyncio.set_event_loop_policy(None)
  100. def main():
  101. proxy = get_random_proxy()
  102. if __name__ == "__main__":
  103. main()