search_tasks.py 3.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. import random
  2. from typing import List
  3. from worker.celery.app import app
  4. from worker.search_engine.drission_google_search import search_keyword_drission
  5. from mylib.logu import logger
  6. import sys
  7. import asyncio
  8. import httpx
  9. from utils.proxy_pool import get_random_proxy
  10. from config.settings import PROXIES
  11. from worker.search_engine.search_result_db import SearchResultManager, KeywordTask, SearchResultItem, SearchPageResult
  12. from sqlmodel import select, Session, exists, distinct
  13. from celery import group
  14. import redis
  15. # redis_client = redis.Redis(host='127.0.0.1', port=6379, db=1)
  16. @app.task(name='search_worker.search_all_uncompleted_keywords')
  17. def search_all_uncompleted_keywords_task(max_result_items: int = 200, skip_existing: bool = True, browser_config: dict = {}, proxy_pool: List[str] = None, dry_run:bool=False):
  18. """异步任务:搜索所有未完成的关键词"""
  19. try:
  20. # redis_client.delete('search_queue')
  21. # logger.info(f"删除旧有search_queue队列")
  22. # 获取所有未完成的关键词
  23. manager = SearchResultManager()
  24. uncompleted_keywords = manager.get_uncompleted_keywords()
  25. if not uncompleted_keywords:
  26. logger.info("没有未完成的关键词需要搜索。")
  27. return {"status": "success", "message": "没有未完成的关键词需要搜索。"}
  28. logger.info(f"找到 {len(uncompleted_keywords)} 个未完成的关键词,开始批量搜索...")
  29. # 创建任务组,每个关键词对应一个 drission_search_task
  30. task_group = group([
  31. drission_search_task.s(
  32. keyword, max_result_items, skip_existing, browser_config, proxy_pool, dry_run
  33. ).set(queue='search_queue')
  34. for keyword in uncompleted_keywords
  35. ])
  36. # 执行任务组
  37. result = task_group.apply_async()
  38. return {"status": "success", "task_id": result.id, "message": f"已启动 {len(uncompleted_keywords)} 个关键词搜索任务。"}
  39. except Exception as e:
  40. logger.error(f"批量搜索任务失败: {str(e)}")
  41. raise
  42. @app.task(name='search_worker.drission_search')
  43. def drission_search_task(keyword: str, max_result_items: int=200, skip_existing: bool=True, browser_config: dict={}, proxy_pool:list[str]=None, dry_run:bool=False):
  44. """异步关键词搜索任务"""
  45. if sys.platform == 'win32':
  46. asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
  47. PROXIES = proxy_pool if proxy_pool else None
  48. async def _execute_search():
  49. try:
  50. if proxy_pool:
  51. browser_config.update({'proxy': get_random_proxy()})
  52. logger.info(f"browser_config: {browser_config}")
  53. logger.info(f"开始处理关键词搜索任务: {keyword}")
  54. if dry_run:
  55. await asyncio.sleep(3)
  56. result = []
  57. else:
  58. result = await search_keyword_drission(
  59. keyword, max_result_items=max_result_items, skip_existing=skip_existing, browser_config=browser_config)
  60. return {"keyword": keyword, "result": result}
  61. except Exception as e:
  62. logger.error(f"关键词搜索任务失败: {str(e)}")
  63. raise
  64. loop = asyncio.new_event_loop()
  65. asyncio.set_event_loop(loop)
  66. try:
  67. return loop.run_until_complete(_execute_search())
  68. finally:
  69. loop.close()
  70. asyncio.set_event_loop_policy(None)
  71. def main():
  72. proxy = get_random_proxy()
  73. if __name__ == "__main__":
  74. main()