| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 |
- import asyncio
- from fastapi import APIRouter, FastAPI, HTTPException, status
- from pydantic import BaseModel
- from typing import Dict, List, Optional, Any
- import DrissionPage
- from worker.search_engine.drission_google_search import search_keyword_drission
- from DrissionPage import ChromiumPage
- from mylib.drission_page import load_chrome_from_ini
- from mylib.logu import logger
- from worker.celery.app import app as celery_app
- from worker.celery.crawl_client import submit_page_crawl_tasks
- from worker.celery.client import get_uncompleted_keywords
- app = APIRouter()
- class TaskResponse(BaseModel):
- task_id: str
- status: str
- message: str
- class SearchRequest(BaseModel):
- keywords: List[str]
- max_result_items: Optional[int] = 200
- skip_existing: Optional[bool] = True
- browser_config: Optional[Dict] = {}
- proxy_pool: Optional[List[str]] = None
- from_db: Optional[bool] = True
- class CrawlKeywordsRequest(BaseModel):
- keywords: List[str]
- browser_config: Optional[Dict] = {}
- proxy_pool: Optional[List[str]] = None
- class ConvertRequest(BaseModel):
- result_ids: List[int]
- class BrowserTestRequest(BaseModel):
- browser_config: Dict = {}
- init_url: str = "https://www.google.com"
- @app.post("/search", summary="执行Google搜索")
- def search(request: SearchRequest) -> List[TaskResponse]:
- """提交所有关键词任务"""
- responses = []
- if request.from_db:
- keywords = get_uncompleted_keywords()
- else:
- keywords = request.keywords
- for keyword in keywords:
- try:
- result = celery_app.send_task('search_worker.drission_search', kwargs=request.model_dump())
- logger.info(f"任务已提交: {keyword} (任务ID: {result.id})")
- responses.append(TaskResponse(
- task_id=result.id,
- status="submitted",
- message=f"Search task submitted for keyword: {keyword}"
- ))
- except Exception as e:
- logger.error(f"提交任务失败 [{keyword}]: {str(e)}")
- responses.append(TaskResponse(
- task_id="",
- status="failed",
- message=str(e)
- ))
- return responses
- class CrawlItemRequest(BaseModel):
- page_ids: List[int]
- browser_config: Dict = {}
- @app.post("/crawl_item", summary="爬取单个搜索结果项")
- def crawl_item(request: CrawlItemRequest) -> List[TaskResponse]:
- """提交页面爬取任务"""
- responses = []
- for page_id in request.page_ids:
- try:
- task_data = {
- 'page_id': page_id,
- 'browser_config': request.browser_config,
- 'overwrite': False
- }
- result = celery_app.send_task('crawl_worker.crawl_page_urls', kwargs=task_data)
- responses.append(TaskResponse(
- task_id=result.id,
- status="submitted",
- message=f"Crawl task submitted for page ID: {page_id}"
- ))
- except Exception as e:
- logger.error(f"提交页面爬取任务失败 [页面ID {page_id}]: {str(e)}")
- responses.append(TaskResponse(
- task_id="",
- status="failed",
- message=str(e)
- ))
- return responses
- @app.post("/crawl_keywords", response_model=List[TaskResponse], summary="提交关键词URL爬取任务")
- def crawl_keywords(request: CrawlKeywordsRequest) -> List[TaskResponse]:
- """提交关键词URL爬取任务(对应worker.celery.crawl_client.submit_crawl_tasks)"""
- responses = []
- for keyword in request.keywords:
- try:
- task_data = {
- 'keyword': keyword.strip(),
- 'browser_config': {
- **request.browser_config,
- 'proxy': request.browser_config.get('proxy'),
- 'headless': True,
- 'verbose': False,
- 'extra_args': ["--disable-gpu", "--disable-dev-shm-usage", "--no-sandbox"]
- },
- 'overwrite': False
- }
- result = celery_app.send_task('crawl_worker.crawl_keyword_urls', kwargs=task_data)
- responses.append(TaskResponse(
- task_id=result.id,
- status="submitted",
- message=f"Keyword crawl task submitted: {keyword}"
- ))
- except Exception as e:
- logger.error(f"提交关键词URL爬取任务失败 [{keyword}]: {str(e)}")
- responses.append(TaskResponse(
- task_id="",
- status="failed",
- message=str(e)
- ))
- return responses
- @app.post("/convert_single", response_model=TaskResponse, summary="转换单个HTML结果")
- def convert_single(request: ConvertRequest) -> TaskResponse:
- """提交单个HTML转换任务"""
- try:
- result = celery_app.send_task('html_convert_tasks_worker.convert_single_result', args=[request.result_ids[0]])
- return TaskResponse(
- task_id=result.id,
- status="submitted",
- message=f"Conversion task submitted for result ID: {request.result_ids[0]}"
- )
- except Exception as e:
- logger.error(f"提交HTML转换任务失败 [结果ID {request.result_ids[0]}]: {str(e)}")
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=str(e)
- )
- @app.post("/convert_all", response_model=TaskResponse, summary="转换所有HTML结果")
- def convert_all() -> TaskResponse:
- """提交批量HTML转换任务"""
- try:
- result = celery_app.send_task('html_convert_tasks.convert_all_results')
- return TaskResponse(
- task_id=result.id,
- status="submitted",
- message="Bulk conversion task submitted for all results"
- )
- except Exception as e:
- logger.error(f"提交批量HTML转换任务失败: {str(e)}")
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=str(e)
- )
- @app.post("/browser/test", summary="测试浏览器启动")
- async def test_browser(request: BrowserTestRequest):
- try:
- logger.info(f"Testing browser launch with parameters: {request.model_dump()}")
- page = await test_browser_launch(request.browser_config, request.init_url)
- return {"status": "success", "url": page.url}
- except Exception as e:
- logger.error(f"Error during browser test: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- async def test_browser_launch(browser_config: Dict, init_url: str) -> ChromiumPage:
- # 如果配置中有代理,使用代理创建浏览器实例
- page = load_chrome_from_ini(
- **browser_config,
- proxy=browser_config.get('proxy') # 自动处理代理配置
- )
- page.get(init_url)
- return page
|