import asyncio from fastapi import APIRouter, FastAPI, HTTPException, status from pydantic import BaseModel from typing import Dict, List, Optional, Any import DrissionPage from worker.search_engine.drission_google_search import search_keyword_drission from DrissionPage import ChromiumPage from mylib.drission_page import load_chrome_from_ini from mylib.logu import logger from worker.celery.app import app as celery_app from worker.celery.crawl_client import submit_page_crawl_tasks app = APIRouter() class TaskResponse(BaseModel): task_id: str status: str message: str class SearchRequest(BaseModel): keywords: List[str] max_result_items: Optional[int] = 200 skip_existing: Optional[bool] = True browser_config: Optional[Dict] = {} proxy_pool: Optional[List[str]] = None class CrawlKeywordsRequest(BaseModel): keywords: List[str] browser_config: Optional[Dict] = {} proxy_pool: Optional[List[str]] = None class ConvertRequest(BaseModel): result_ids: List[int] class BrowserTestRequest(BaseModel): browser_config: Dict = {} init_url: str = "https://www.google.com" @app.post("/search", summary="执行Google搜索") def search(request: SearchRequest) -> List[TaskResponse]: """提交所有关键词任务""" responses = [] for keyword in request.keywords: try: result = celery_app.send_task('search_worker.drission_search', kwargs=request.model_dump()) logger.info(f"任务已提交: {keyword} (任务ID: {result.id})") responses.append(TaskResponse( task_id=result.id, status="submitted", message=f"Search task submitted for keyword: {keyword}" )) except Exception as e: logger.error(f"提交任务失败 [{keyword}]: {str(e)}") responses.append(TaskResponse( task_id="", status="failed", message=str(e) )) return responses class CrawlItemRequest(BaseModel): page_ids: List[int] browser_config: Dict = {} @app.post("/crawl_item", summary="爬取单个搜索结果项") def crawl_item(request: CrawlItemRequest) -> List[TaskResponse]: """提交页面爬取任务""" responses = [] for page_id in request.page_ids: try: task_data = { 'page_id': page_id, 'browser_config': request.browser_config, 'overwrite': False } result = celery_app.send_task('crawl_worker.crawl_page_urls', kwargs=task_data) responses.append(TaskResponse( task_id=result.id, status="submitted", message=f"Crawl task submitted for page ID: {page_id}" )) except Exception as e: logger.error(f"提交页面爬取任务失败 [页面ID {page_id}]: {str(e)}") responses.append(TaskResponse( task_id="", status="failed", message=str(e) )) return responses @app.post("/crawl_keywords", response_model=List[TaskResponse], summary="提交关键词URL爬取任务") def crawl_keywords(request: CrawlKeywordsRequest) -> List[TaskResponse]: """提交关键词URL爬取任务(对应worker.celery.crawl_client.submit_crawl_tasks)""" responses = [] for keyword in request.keywords: try: task_data = { 'keyword': keyword.strip(), 'browser_config': { **request.browser_config, 'proxy': request.browser_config.get('proxy'), 'headless': True, 'verbose': False, 'extra_args': ["--disable-gpu", "--disable-dev-shm-usage", "--no-sandbox"] }, 'overwrite': False } result = celery_app.send_task('crawl_worker.crawl_keyword_urls', kwargs=task_data) responses.append(TaskResponse( task_id=result.id, status="submitted", message=f"Keyword crawl task submitted: {keyword}" )) except Exception as e: logger.error(f"提交关键词URL爬取任务失败 [{keyword}]: {str(e)}") responses.append(TaskResponse( task_id="", status="failed", message=str(e) )) return responses @app.post("/convert_single", response_model=TaskResponse, summary="转换单个HTML结果") def convert_single(request: ConvertRequest) -> TaskResponse: """提交单个HTML转换任务""" try: result = celery_app.send_task('html_convert_tasks_worker.convert_single_result', args=[request.result_ids[0]]) return TaskResponse( task_id=result.id, status="submitted", message=f"Conversion task submitted for result ID: {request.result_ids[0]}" ) except Exception as e: logger.error(f"提交HTML转换任务失败 [结果ID {request.result_ids[0]}]: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) @app.post("/convert_all", response_model=TaskResponse, summary="转换所有HTML结果") def convert_all() -> TaskResponse: """提交批量HTML转换任务""" try: result = celery_app.send_task('html_convert_tasks.convert_all_results') return TaskResponse( task_id=result.id, status="submitted", message="Bulk conversion task submitted for all results" ) except Exception as e: logger.error(f"提交批量HTML转换任务失败: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) @app.post("/browser/test", summary="测试浏览器启动") async def test_browser(request: BrowserTestRequest): try: logger.info(f"Testing browser launch with parameters: {request.model_dump()}") page = await test_browser_launch(request.browser_config, request.init_url) return {"status": "success", "url": page.url} except Exception as e: logger.error(f"Error during browser test: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) async def test_browser_launch(browser_config: Dict, init_url: str) -> ChromiumPage: # 如果配置中有代理,使用代理创建浏览器实例 page = load_chrome_from_ini( **browser_config, proxy=browser_config.get('proxy') # 自动处理代理配置 ) page.get(init_url) return page