| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- import asyncio
- import sys
- from typing import Optional
- from celery import group
- from worker.celery.app import app
- from worker.crawl_pages.crawl_urls import URLCrawler
- from mylib.logu import get_logger
- from config.settings import PROXIES
- from crawl4ai import BrowserConfig
- from pydantic import BaseModel
- from mylib.drission_page import load_random_ua_chrome, load_chrome_from_ini
- from utils.proxy_pool import get_random_proxy
- from worker.search_engine.search_result_db import SearchResultItem
- logger = get_logger('crawl_worker')
- class SearchBrowserConfig(BaseModel):
- # def load_chrome_from_ini(path=CONFIG_DIR / '9321.ini', headless=False, proxy=None, browser_path=None, no_imgs=True):
- headless: Optional[bool] = False
- proxy: Optional[str] = None
- browser_path: Optional[str] = None
- no_imgs: Optional[bool] = True
- class CrawlBrowserConfig(BaseModel):
- headless: bool = True
- verbose: bool = False
- extra_args: list[str] = []
- proxy: str | None = None
- class CrawlTaskConfig(BaseModel):
- browser_config: Optional[SearchBrowserConfig] = None
- overwrite: Optional[bool] = False
- dry_run: Optional[bool] = False
- proxy_pool_url: Optional[str] = None
- queue_name: str = 'crawl_queue' # 新增队列名称参数
- @app.task(
- name='crawl_worker.crawl_all_unprocessed_pages',
- serializer='pickle',
- accept=['pickle', 'json']
- )
- def crawl_all_unprocessed_pages_task(config: dict|CrawlTaskConfig):
- """异步任务:提交所有未处理的页面URL爬取任务"""
- from worker.search_engine.search_result_db import db_manager
-
- config = CrawlTaskConfig(**config)
- try:
- page_ids = db_manager.get_pages_with_unprocessed_urls()
-
- if not page_ids:
- logger.info("没有未处理的页面需要爬取")
- return {"status": "success", "message": "没有未处理的页面需要爬取"}
-
- logger.info(f"找到 {len(page_ids)} 个未处理的页面,开始批量提交任务...")
-
- # 批量创建任务签名
- task_signatures = [
- crawl_page_urls_task.s(
- page_id=page_id,
- config=config.model_dump()
- ).set(queue=config.queue_name)
- for page_id in page_ids
- ]
-
- # 分块处理(每100个任务为一组)
- chunk_size = 100
- results = []
- for i in range(0, len(task_signatures), chunk_size):
- chunk = task_signatures[i:i+chunk_size]
- task_group = group(chunk)
- group_result = task_group.apply_async()
- results.append(group_result.id)
- logger.info(f"已提交第 {i//chunk_size + 1} 批任务,共 {len(chunk)} 个")
-
- return {
- "status": "success",
- "task_ids": results,
- "message": f"已启动 {len(page_ids)} 个页面爬取任务(分{len(results)}批)"
- }
- except Exception as e:
- logger.error(f"批量提交页面任务失败: {str(e)}")
- raise
- @app.task(name='crawl_worker.crawl_keyword_urls')
- def crawl_keyword_urls_task(keyword: str, config: dict):
- """Celery task to crawl all URLs for a specific keyword"""
- crawl_task_config = CrawlTaskConfig(**config)
- if sys.platform == 'win32':
- asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
- async def _execute_crawl():
- try:
- # Convert to BrowserConfig
- browser_config = BrowserConfig(crawl_task_config.browser_config or {})
-
- logger.info(f"Starting URL crawl for keyword: {keyword}")
- crawler = URLCrawler()
- if crawl_task_config.proxy_pool_url:
- browser_config.proxy = await crawler.get_random_proxy(crawl_task_config.proxy_pool_url)
- logger.info(f"BrowserConfig proxy: {browser_config.proxy}")
- await crawler.crawl_keyword_urls(keyword, browser_config, crawl_task_config.overwrite, crawl_task_config.dry_run)
- logger.info(f"{'*' * 20} URL crawl completed for keyword: {keyword}")
- return {"keyword": keyword, "status": "completed"}
- except Exception as e:
- logger.exception(f"URL crawl task failed for keyword: {keyword}: {str(e)}")
- raise
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- try:
- return loop.run_until_complete(_execute_crawl())
- finally:
- loop.close()
- asyncio.set_event_loop_policy(None)
- @app.task(name='crawl_worker.crawl_page_urls')
- def crawl_page_urls_task(page_id: int, config: dict):
- """Celery task to crawl all URLs for a specific search page"""
- crawl_task_config = CrawlTaskConfig(**config)
- if sys.platform == 'win32':
- asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
-
- async def _execute_crawl():
- try:
- search_browser_config = SearchBrowserConfig(**(crawl_task_config.browser_config.model_dump() or {}))
- if crawl_task_config.proxy_pool_url:
- proxy = await asyncio.to_thread(get_random_proxy, crawl_task_config.proxy_pool_url)
- search_browser_config.proxy = proxy
- logger.info(f"使用代理池: {crawl_task_config.proxy_pool_url} --> {proxy}")
- else:
- logger.info(f"使用代理: 跟随系统")
- page = load_chrome_from_ini(**search_browser_config.model_dump()) if search_browser_config else load_chrome_from_ini()
-
- crawl_browser_config = BrowserConfig(
- headless=search_browser_config.headless,
- use_managed_browser=True,
- cdp_url=page.browser._driver._websocket_url
- )
- logger.info(f"{"(测试模式)" if crawl_task_config.dry_run else ""}开始提取搜索结果页: {page_id}")
- crawler = URLCrawler()
- save_dir, list_res = await crawler.crawl_page_urls(page_id, crawl_browser_config, crawl_task_config.overwrite, crawl_task_config.dry_run)
- files = []
- # logger.info(f"list_res {list_res} ")
- if list_res:
- for crawl_ret in list_res:
- if not isinstance(crawl_ret, dict):
- continue
- search_result_model:SearchResultItem = crawl_ret.get("search_result_model")
- if search_result_model:
- files.append(search_result_model.html_path)
- ret = {"page_id": page_id, "status": "completed", "save_dir": save_dir, "files":files}
- logger.info(f"保存目录: {save_dir}")
- logger.info(f"提取到 {len(files)} 个结果: {files}")
- logger.info(f"{'*' * 20} URL crawl completed for page ID: {page_id} {'*' * 20}")
- return ret
- except Exception as e:
- logger.error(f"URL crawl task failed for page ID {page_id}: {str(e)}")
- raise
- finally:
- if locals().get("page"):
- page.quit()
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- try:
- return loop.run_until_complete(_execute_crawl())
- finally:
- loop.close()
- asyncio.set_event_loop_policy(None)
- def main():
- # Example usage
- pass
- if __name__ == "__main__":
- main()
|