from worker.celery.app import app from camoufox.async_api import AsyncCamoufox from worker.search_engine.google_search import search_keyword from worker.search_engine.camoufox_broswer import BrowserConfig from mylib.logu import logger import asyncio import sys @app.task def async_browser_task(url: str): logger.info(f"url {url}") """异步浏览器测试任务""" # Windows平台需要特殊处理事件循环 if sys.platform == 'win32': asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) async def _execute_async(): try: async with AsyncCamoufox() as browser: page = await browser.new_page() await page.goto(url) title = await page.title() logger.info(f"成功访问 {url} 页面标题: {title}") return {"url": url, "title": title} except Exception as e: logger.error(f"浏览器任务失败: {str(e)}") raise # 显式创建并管理事件循环 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(_execute_async()) finally: loop.close() asyncio.set_event_loop_policy(None) # Cleanup policy after use @app.task(name='search_worker.search') def async_search_task(keyword: str, max_result_items: int=200, skip_existing: bool=True): """异步关键词搜索任务""" if sys.platform == 'win32': asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) async def _execute_search(): try: logger.info(f"开始处理关键词搜索任务: {keyword}") result = await search_keyword(keyword, max_result_items=max_result_items, skip_existing=skip_existing) return {"keyword": keyword, "result": result} except Exception as e: logger.error(f"关键词搜索任务失败: {str(e)}") raise loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(_execute_search()) finally: loop.close() asyncio.set_event_loop_policy(None) @app.task(name='search_worker.add') # 添加明确的task name def add(x, y): return x + y def main(): # logger.info(f"url") # res = async_browser_task.delay("https://www.baidu.com") # res = add.delay(1,2) res = async_search_task.delay("Acalypha psilostachya essential oil",200,False) logger.info(f"{res}") if __name__ == "__main__": main()