import httpx from worker.celery.app import app from camoufox.async_api import AsyncCamoufox from worker.search_engine.google_search import search_keyword from worker.search_engine.camoufox_broswer import BrowserConfig from mylib.logu import logger import asyncio import sys from config.settings import PROXY_POOL_BASE_URL @app.task def async_browser_task(url: str): logger.info(f"url {url}") """异步浏览器测试任务""" # Windows平台需要特殊处理事件循环 if sys.platform == 'win32': asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) async def _execute_async(): try: async with AsyncCamoufox() as browser: page = await browser.new_page() await page.goto(url) title = await page.title() logger.info(f"成功访问 {url} 页面标题: {title}") return {"url": url, "title": title} except Exception as e: logger.error(f"浏览器任务失败: {str(e)}") raise # 显式创建并管理事件循环 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(_execute_async()) finally: loop.close() asyncio.set_event_loop_policy(None) # Cleanup policy after use async def get_random_proxy(): """测试所有运行中代理的延迟并启动可用代理""" url = f"{PROXY_POOL_BASE_URL}/get" async with httpx.AsyncClient() as client: response = await client.get(url, timeout=30) response.raise_for_status() results = response.json() logger.info(f"results {results}") port = results["port"] addr = f'http://127.0.0.1:{port}' logger.info(f"curl -i -x {addr} https://www.google.com") return addr @app.task(name='search_worker.search') def async_search_task(keyword: str, max_result_items: int=200, skip_existing: bool=True, browser_config: BrowserConfig=BrowserConfig()): """异步关键词搜索任务""" if sys.platform == 'win32': asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) async def _execute_search(): try: proxy = {'server': await get_random_proxy()} browser_config.proxy.update(proxy) logger.info(f"{browser_config.proxy}") logger.info(f"开始处理关键词搜索任务: {keyword}") result = await search_keyword(keyword, max_result_items=max_result_items, skip_existing=skip_existing, config=browser_config) return {"keyword": keyword, "result": result} except Exception as e: logger.error(f"关键词搜索任务失败: {str(e)}") raise loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(_execute_search()) finally: loop.close() asyncio.set_event_loop_policy(None) @app.task(name='search_worker.add') # 添加明确的task name def add(x, y): return x + y def main(): # logger.info(f"url") # res = async_browser_task.delay("https://www.baidu.com") # res = add.delay(1,2) res = async_search_task.delay("Acalypha psilostachya essential oil",200,False) logger.info(f"{res}") if __name__ == "__main__": main()