| 1234567891011121314151617181920212223242526272829303132333435363738394041 |
- import random
- from worker.celery.app import app
- from worker.search_engine.drission_google_search import search_keyword_drission
- from mylib.logu import logger
- import sys
- import asyncio
- import httpx
- from utils.proxy_pool import get_random_proxy
- from config.settings import PROXIES
- @app.task(name='search_worker.drission_search')
- def drission_search_task(keyword: str, max_result_items: int=200, skip_existing: bool=True, browser_config: dict={}, proxy_pool:list[str]=None):
- """异步关键词搜索任务"""
- if sys.platform == 'win32':
- asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
- PROXIES = proxy_pool if proxy_pool else None
- async def _execute_search():
- try:
- if proxy_pool:
- browser_config.update({'proxy': get_random_proxy()})
- logger.info(f"browser_config: {browser_config}")
- logger.info(f"开始处理关键词搜索任务: {keyword}")
- result = search_keyword_drission(keyword, max_result_items=max_result_items, skip_existing=skip_existing, browser_config=browser_config)
- return {"keyword": keyword, "result": result}
- except Exception as e:
- logger.error(f"关键词搜索任务失败: {str(e)}")
- raise
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- try:
- return loop.run_until_complete(_execute_search())
- finally:
- loop.close()
- asyncio.set_event_loop_policy(None)
- def main():
- proxy = get_random_proxy()
- if __name__ == "__main__":
- main()
|