crawl_tasks.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. import asyncio
  2. import sys
  3. from typing import Optional
  4. from celery import group
  5. from worker.celery.app import app
  6. from worker.crawl_pages.crawl_urls import URLCrawler
  7. from mylib.logu import get_logger
  8. from config.settings import PROXIES
  9. from crawl4ai import BrowserConfig
  10. from pydantic import BaseModel
  11. from mylib.drission_page import load_random_ua_chrome, load_chrome_from_ini
  12. from utils.proxy_pool import get_random_proxy
  13. from worker.search_engine.search_result_db import SearchResultItem
  14. logger = get_logger('crawl_worker')
  15. class SearchBrowserConfig(BaseModel):
  16. # def load_chrome_from_ini(path=CONFIG_DIR / '9321.ini', headless=False, proxy=None, browser_path=None, no_imgs=True):
  17. headless: Optional[bool] = False
  18. proxy: Optional[str] = None
  19. browser_path: Optional[str] = None
  20. no_imgs: Optional[bool] = True
  21. class CrawlBrowserConfig(BaseModel):
  22. headless: bool = True
  23. verbose: bool = False
  24. extra_args: list[str] = []
  25. proxy: str | None = None
  26. class CrawlTaskConfig(BaseModel):
  27. browser_config: Optional[SearchBrowserConfig] = None
  28. overwrite: Optional[bool] = False
  29. dry_run: Optional[bool] = False
  30. proxy_pool_url: Optional[str] = None
  31. queue_name: str = 'crawl_queue' # 新增队列名称参数
  32. @app.task(
  33. name='crawl_worker.crawl_all_unprocessed_pages',
  34. serializer='pickle',
  35. accept=['pickle', 'json']
  36. )
  37. def crawl_all_unprocessed_pages_task(config: dict|CrawlTaskConfig):
  38. """异步任务:提交所有未处理的页面URL爬取任务"""
  39. from worker.search_engine.search_result_db import db_manager
  40. config = CrawlTaskConfig(**config)
  41. try:
  42. page_ids = db_manager.get_pages_with_unprocessed_urls()
  43. if not page_ids:
  44. logger.info("没有未处理的页面需要爬取")
  45. return {"status": "success", "message": "没有未处理的页面需要爬取"}
  46. logger.info(f"找到 {len(page_ids)} 个未处理的页面,开始批量提交任务...")
  47. # 批量创建任务签名
  48. task_signatures = [
  49. crawl_page_urls_task.s(
  50. page_id=page_id,
  51. config=config.model_dump()
  52. ).set(queue=config.queue_name)
  53. for page_id in page_ids
  54. ]
  55. # 分块处理(每100个任务为一组)
  56. chunk_size = 100
  57. results = []
  58. for i in range(0, len(task_signatures), chunk_size):
  59. chunk = task_signatures[i:i+chunk_size]
  60. task_group = group(chunk)
  61. group_result = task_group.apply_async()
  62. results.append(group_result.id)
  63. logger.info(f"已提交第 {i//chunk_size + 1} 批任务,共 {len(chunk)} 个")
  64. return {
  65. "status": "success",
  66. "task_ids": results,
  67. "message": f"已启动 {len(page_ids)} 个页面爬取任务(分{len(results)}批)"
  68. }
  69. except Exception as e:
  70. logger.error(f"批量提交页面任务失败: {str(e)}")
  71. raise
  72. @app.task(name='crawl_worker.crawl_keyword_urls')
  73. def crawl_keyword_urls_task(keyword: str, config: dict):
  74. """Celery task to crawl all URLs for a specific keyword"""
  75. crawl_task_config = CrawlTaskConfig(**config)
  76. if sys.platform == 'win32':
  77. asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
  78. async def _execute_crawl():
  79. try:
  80. # Convert to BrowserConfig
  81. browser_config = BrowserConfig(crawl_task_config.browser_config or {})
  82. logger.info(f"Starting URL crawl for keyword: {keyword}")
  83. crawler = URLCrawler()
  84. if crawl_task_config.proxy_pool_url:
  85. browser_config.proxy = await crawler.get_random_proxy(crawl_task_config.proxy_pool_url)
  86. logger.info(f"BrowserConfig proxy: {browser_config.proxy}")
  87. await crawler.crawl_keyword_urls(keyword, browser_config, crawl_task_config.overwrite, crawl_task_config.dry_run)
  88. logger.info(f"{'*' * 20} URL crawl completed for keyword: {keyword}")
  89. return {"keyword": keyword, "status": "completed"}
  90. except Exception as e:
  91. logger.exception(f"URL crawl task failed for keyword: {keyword}: {str(e)}")
  92. raise
  93. loop = asyncio.new_event_loop()
  94. asyncio.set_event_loop(loop)
  95. try:
  96. return loop.run_until_complete(_execute_crawl())
  97. finally:
  98. loop.close()
  99. asyncio.set_event_loop_policy(None)
  100. @app.task(name='crawl_worker.crawl_page_urls')
  101. def crawl_page_urls_task(page_id: int, config: dict):
  102. """Celery task to crawl all URLs for a specific search page"""
  103. crawl_task_config = CrawlTaskConfig(**config)
  104. if sys.platform == 'win32':
  105. asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
  106. async def _execute_crawl():
  107. try:
  108. search_browser_config = SearchBrowserConfig(**(crawl_task_config.browser_config.model_dump() or {}))
  109. if crawl_task_config.proxy_pool_url:
  110. proxy = await asyncio.to_thread(get_random_proxy, crawl_task_config.proxy_pool_url)
  111. search_browser_config.proxy = proxy
  112. logger.info(f"使用代理池: {crawl_task_config.proxy_pool_url} --> {proxy}")
  113. else:
  114. logger.info(f"使用代理: 跟随系统")
  115. page = load_chrome_from_ini(**search_browser_config.model_dump()) if search_browser_config else load_chrome_from_ini()
  116. crawl_browser_config = BrowserConfig(
  117. headless=search_browser_config.headless,
  118. use_managed_browser=True,
  119. cdp_url=page.browser._driver._websocket_url
  120. )
  121. logger.info(f"{"(测试模式)" if crawl_task_config.dry_run else ""}开始提取搜索结果页: {page_id}")
  122. crawler = URLCrawler()
  123. save_dir, list_res = await crawler.crawl_page_urls(page_id, crawl_browser_config, crawl_task_config.overwrite, crawl_task_config.dry_run)
  124. files = []
  125. # logger.info(f"list_res {list_res} ")
  126. if list_res:
  127. for crawl_ret in list_res:
  128. if not isinstance(crawl_ret, dict):
  129. continue
  130. search_result_model:SearchResultItem = crawl_ret.get("search_result_model")
  131. if search_result_model:
  132. files.append(search_result_model.html_path)
  133. ret = {"page_id": page_id, "status": "completed", "save_dir": save_dir, "files":files}
  134. logger.info(f"保存目录: {save_dir}")
  135. logger.info(f"提取到 {len(files)} 个结果: {files}")
  136. logger.info(f"{'*' * 20} URL crawl completed for page ID: {page_id} {'*' * 20}")
  137. return ret
  138. except Exception as e:
  139. logger.error(f"URL crawl task failed for page ID {page_id}: {str(e)}")
  140. raise
  141. finally:
  142. if locals().get("page"):
  143. page.quit()
  144. loop = asyncio.new_event_loop()
  145. asyncio.set_event_loop(loop)
  146. try:
  147. return loop.run_until_complete(_execute_crawl())
  148. finally:
  149. loop.close()
  150. asyncio.set_event_loop_policy(None)
  151. def main():
  152. # Example usage
  153. pass
  154. if __name__ == "__main__":
  155. main()