| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- import httpx
- from worker.celery.app import app
- from camoufox.async_api import AsyncCamoufox
- from worker.search_engine.google_search import search_keyword
- from worker.search_engine.camoufox_broswer import BrowserConfig
- from mylib.logu import logger
- import asyncio
- import sys
- from config.settings import PROXY_POOL_BASE_URL
- @app.task
- def async_browser_task(url: str):
- logger.info(f"url {url}")
- """异步浏览器测试任务"""
- # Windows平台需要特殊处理事件循环
- if sys.platform == 'win32':
- asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
-
- async def _execute_async():
- try:
- async with AsyncCamoufox() as browser:
- page = await browser.new_page()
- await page.goto(url)
- title = await page.title()
- logger.info(f"成功访问 {url} 页面标题: {title}")
- return {"url": url, "title": title}
- except Exception as e:
- logger.error(f"浏览器任务失败: {str(e)}")
- raise
-
- # 显式创建并管理事件循环
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- try:
- return loop.run_until_complete(_execute_async())
- finally:
- loop.close()
- asyncio.set_event_loop_policy(None) # Cleanup policy after use
- async def get_random_proxy():
- """测试所有运行中代理的延迟并启动可用代理"""
- url = f"{PROXY_POOL_BASE_URL}/get"
- async with httpx.AsyncClient() as client:
- response = await client.get(url, timeout=30)
- response.raise_for_status()
- results = response.json()
- logger.info(f"results {results}")
- port = results["port"]
- addr = f'http://127.0.0.1:{port}'
- logger.info(f"curl -i -x {addr} https://www.google.com")
- return addr
- @app.task(name='search_worker.search')
- def async_search_task(keyword: str, max_result_items: int=200, skip_existing: bool=True, browser_config: BrowserConfig=BrowserConfig()):
- """异步关键词搜索任务"""
- if sys.platform == 'win32':
- asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
- async def _execute_search():
- try:
- proxy = {'server': await get_random_proxy()}
- browser_config.proxy.update(proxy)
- logger.info(f"{browser_config.proxy}")
- logger.info(f"开始处理关键词搜索任务: {keyword}")
- result = await search_keyword(keyword, max_result_items=max_result_items, skip_existing=skip_existing, config=browser_config)
- return {"keyword": keyword, "result": result}
- except Exception as e:
- logger.error(f"关键词搜索任务失败: {str(e)}")
- raise
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- try:
- return loop.run_until_complete(_execute_search())
- finally:
- loop.close()
- asyncio.set_event_loop_policy(None)
- @app.task(name='search_worker.add') # 添加明确的task name
- def add(x, y):
- return x + y
- def main():
- # logger.info(f"url")
- # res = async_browser_task.delay("https://www.baidu.com")
- # res = add.delay(1,2)
- res = async_search_task.delay("Acalypha psilostachya essential oil",200,False)
- logger.info(f"{res}")
- if __name__ == "__main__":
- main()
|