search_tasks.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. import random
  2. from worker.celery.app import app
  3. from worker.search_engine.drission_google_search import search_keyword_drission
  4. from mylib.logu import logger
  5. import sys
  6. import asyncio
  7. import httpx
  8. from utils.proxy_pool import get_random_proxy
  9. from config.settings import PROXIES
  10. @app.task(name='search_worker.drission_search')
  11. def drission_search_task(keyword: str, max_result_items: int=200, skip_existing: bool=True, browser_config: dict={}, proxy_pool:list[str]=None):
  12. """异步关键词搜索任务"""
  13. if sys.platform == 'win32':
  14. asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
  15. PROXIES = proxy_pool if proxy_pool else None
  16. async def _execute_search():
  17. try:
  18. if proxy_pool:
  19. browser_config.update({'proxy': get_random_proxy()})
  20. logger.info(f"browser_config: {browser_config}")
  21. logger.info(f"开始处理关键词搜索任务: {keyword}")
  22. result = search_keyword_drission(keyword, max_result_items=max_result_items, skip_existing=skip_existing, browser_config=browser_config)
  23. return {"keyword": keyword, "result": result}
  24. except Exception as e:
  25. logger.error(f"关键词搜索任务失败: {str(e)}")
  26. raise
  27. loop = asyncio.new_event_loop()
  28. asyncio.set_event_loop(loop)
  29. try:
  30. return loop.run_until_complete(_execute_search())
  31. finally:
  32. loop.close()
  33. asyncio.set_event_loop_policy(None)
  34. def main():
  35. proxy = get_random_proxy()
  36. if __name__ == "__main__":
  37. main()