async_tasks.py 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import httpx
  2. from worker.celery.app import app
  3. from camoufox.async_api import AsyncCamoufox
  4. from worker.search_engine.google_search import search_keyword
  5. from worker.search_engine.camoufox_broswer import BrowserConfig
  6. from mylib.logu import logger
  7. import asyncio
  8. import sys
  9. from config.settings import PROXY_POOL_BASE_URL
  10. @app.task
  11. def async_browser_task(url: str):
  12. logger.info(f"url {url}")
  13. """异步浏览器测试任务"""
  14. # Windows平台需要特殊处理事件循环
  15. if sys.platform == 'win32':
  16. asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
  17. async def _execute_async():
  18. try:
  19. async with AsyncCamoufox() as browser:
  20. page = await browser.new_page()
  21. await page.goto(url)
  22. title = await page.title()
  23. logger.info(f"成功访问 {url} 页面标题: {title}")
  24. return {"url": url, "title": title}
  25. except Exception as e:
  26. logger.error(f"浏览器任务失败: {str(e)}")
  27. raise
  28. # 显式创建并管理事件循环
  29. loop = asyncio.new_event_loop()
  30. asyncio.set_event_loop(loop)
  31. try:
  32. return loop.run_until_complete(_execute_async())
  33. finally:
  34. loop.close()
  35. asyncio.set_event_loop_policy(None) # Cleanup policy after use
  36. async def get_random_proxy():
  37. """测试所有运行中代理的延迟并启动可用代理"""
  38. url = f"{PROXY_POOL_BASE_URL}/get"
  39. async with httpx.AsyncClient() as client:
  40. response = await client.get(url, timeout=30)
  41. response.raise_for_status()
  42. results = response.json()
  43. logger.info(f"results {results}")
  44. port = results["port"]
  45. addr = f'http://127.0.0.1:{port}'
  46. logger.info(f"curl -i -x {addr} https://www.google.com")
  47. return addr
  48. @app.task(name='search_worker.search')
  49. def async_search_task(keyword: str, max_result_items: int=200, skip_existing: bool=True, browser_config: BrowserConfig=BrowserConfig()):
  50. """异步关键词搜索任务"""
  51. if sys.platform == 'win32':
  52. asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
  53. async def _execute_search():
  54. try:
  55. proxy = {'server': await get_random_proxy()}
  56. browser_config.proxy.update(proxy)
  57. logger.info(f"{browser_config.proxy}")
  58. logger.info(f"开始处理关键词搜索任务: {keyword}")
  59. result = await search_keyword(keyword, max_result_items=max_result_items, skip_existing=skip_existing, config=browser_config)
  60. return {"keyword": keyword, "result": result}
  61. except Exception as e:
  62. logger.error(f"关键词搜索任务失败: {str(e)}")
  63. raise
  64. loop = asyncio.new_event_loop()
  65. asyncio.set_event_loop(loop)
  66. try:
  67. return loop.run_until_complete(_execute_search())
  68. finally:
  69. loop.close()
  70. asyncio.set_event_loop_policy(None)
  71. @app.task(name='search_worker.add') # 添加明确的task name
  72. def add(x, y):
  73. return x + y
  74. def main():
  75. # logger.info(f"url")
  76. # res = async_browser_task.delay("https://www.baidu.com")
  77. # res = add.delay(1,2)
  78. res = async_search_task.delay("Acalypha psilostachya essential oil",200,False)
  79. logger.info(f"{res}")
  80. if __name__ == "__main__":
  81. main()