async_tasks.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. from worker.celery.app import app
  2. from camoufox.async_api import AsyncCamoufox
  3. from worker.search_engine.google_search import search_keyword
  4. from worker.search_engine.camoufox_broswer import BrowserConfig
  5. from mylib.logu import logger
  6. import asyncio
  7. import sys
  8. @app.task
  9. def async_browser_task(url: str):
  10. logger.info(f"url {url}")
  11. """异步浏览器测试任务"""
  12. # Windows平台需要特殊处理事件循环
  13. if sys.platform == 'win32':
  14. asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
  15. async def _execute_async():
  16. try:
  17. async with AsyncCamoufox() as browser:
  18. page = await browser.new_page()
  19. await page.goto(url)
  20. title = await page.title()
  21. logger.info(f"成功访问 {url} 页面标题: {title}")
  22. return {"url": url, "title": title}
  23. except Exception as e:
  24. logger.error(f"浏览器任务失败: {str(e)}")
  25. raise
  26. # 显式创建并管理事件循环
  27. loop = asyncio.new_event_loop()
  28. asyncio.set_event_loop(loop)
  29. try:
  30. return loop.run_until_complete(_execute_async())
  31. finally:
  32. loop.close()
  33. asyncio.set_event_loop_policy(None) # Cleanup policy after use
  34. @app.task(name='search_worker.search')
  35. def async_search_task(keyword: str, max_result_items: int=200, skip_existing: bool=True):
  36. """异步关键词搜索任务"""
  37. if sys.platform == 'win32':
  38. asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
  39. async def _execute_search():
  40. try:
  41. logger.info(f"开始处理关键词搜索任务: {keyword}")
  42. result = await search_keyword(keyword, max_result_items=max_result_items, skip_existing=skip_existing)
  43. return {"keyword": keyword, "result": result}
  44. except Exception as e:
  45. logger.error(f"关键词搜索任务失败: {str(e)}")
  46. raise
  47. loop = asyncio.new_event_loop()
  48. asyncio.set_event_loop(loop)
  49. try:
  50. return loop.run_until_complete(_execute_search())
  51. finally:
  52. loop.close()
  53. asyncio.set_event_loop_policy(None)
  54. @app.task(name='search_worker.add') # 添加明确的task name
  55. def add(x, y):
  56. return x + y
  57. def main():
  58. # logger.info(f"url")
  59. # res = async_browser_task.delay("https://www.baidu.com")
  60. # res = add.delay(1,2)
  61. res = async_search_task.delay("Acalypha psilostachya essential oil",200,False)
  62. logger.info(f"{res}")
  63. if __name__ == "__main__":
  64. main()