| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- from worker.celery.app import app
- from camoufox.async_api import AsyncCamoufox
- from worker.search_engine.google_search import search_keyword
- from worker.search_engine.camoufox_broswer import BrowserConfig
- from mylib.logu import logger
- import asyncio
- import sys
- @app.task
- def async_browser_task(url: str):
- logger.info(f"url {url}")
- """异步浏览器测试任务"""
- # Windows平台需要特殊处理事件循环
- if sys.platform == 'win32':
- asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
-
- async def _execute_async():
- try:
- async with AsyncCamoufox() as browser:
- page = await browser.new_page()
- await page.goto(url)
- title = await page.title()
- logger.info(f"成功访问 {url} 页面标题: {title}")
- return {"url": url, "title": title}
- except Exception as e:
- logger.error(f"浏览器任务失败: {str(e)}")
- raise
-
- # 显式创建并管理事件循环
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- try:
- return loop.run_until_complete(_execute_async())
- finally:
- loop.close()
- asyncio.set_event_loop_policy(None) # Cleanup policy after use
- @app.task(name='search_worker.search')
- def async_search_task(keyword: str, max_result_items: int=200, skip_existing: bool=True):
- """异步关键词搜索任务"""
- if sys.platform == 'win32':
- asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
- async def _execute_search():
- try:
- logger.info(f"开始处理关键词搜索任务: {keyword}")
- result = await search_keyword(keyword, max_result_items=max_result_items, skip_existing=skip_existing)
- return {"keyword": keyword, "result": result}
- except Exception as e:
- logger.error(f"关键词搜索任务失败: {str(e)}")
- raise
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- try:
- return loop.run_until_complete(_execute_search())
- finally:
- loop.close()
- asyncio.set_event_loop_policy(None)
- @app.task(name='search_worker.add') # 添加明确的task name
- def add(x, y):
- return x + y
- def main():
- # logger.info(f"url")
- # res = async_browser_task.delay("https://www.baidu.com")
- # res = add.delay(1,2)
- res = async_search_task.delay("Acalypha psilostachya essential oil",200,False)
- logger.info(f"{res}")
- if __name__ == "__main__":
- main()
|