search_cli.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. import asyncio
  2. from fastapi import APIRouter, FastAPI, HTTPException, status
  3. from pydantic import BaseModel
  4. from typing import Dict, List, Optional, Any
  5. import DrissionPage
  6. from worker.search_engine.drission_google_search import search_keyword_drission
  7. from DrissionPage import ChromiumPage
  8. from mylib.drission_page import load_chrome_from_ini
  9. from mylib.logu import logger
  10. from worker.celery.app import app as celery_app
  11. from worker.celery.crawl_client import submit_page_crawl_tasks
  12. from worker.celery.client import get_uncompleted_keywords
  13. app = APIRouter()
  14. class TaskResponse(BaseModel):
  15. task_id: str
  16. status: str
  17. message: str
  18. class SearchRequest(BaseModel):
  19. keywords: List[str]
  20. max_result_items: Optional[int] = 200
  21. skip_existing: Optional[bool] = True
  22. browser_config: Optional[Dict] = {}
  23. proxy_pool: Optional[List[str]] = None
  24. from_db: Optional[bool] = True
  25. class CrawlKeywordsRequest(BaseModel):
  26. keywords: List[str]
  27. browser_config: Optional[Dict] = {}
  28. proxy_pool: Optional[List[str]] = None
  29. class ConvertRequest(BaseModel):
  30. result_ids: List[int]
  31. class BrowserTestRequest(BaseModel):
  32. browser_config: Dict = {}
  33. init_url: str = "https://www.google.com"
  34. @app.post("/search", summary="执行Google搜索")
  35. def search(request: SearchRequest) -> List[TaskResponse]:
  36. """提交所有关键词任务"""
  37. responses = []
  38. if request.from_db:
  39. keywords = get_uncompleted_keywords()
  40. else:
  41. keywords = request.keywords
  42. for keyword in keywords:
  43. try:
  44. result = celery_app.send_task('search_worker.drission_search', kwargs=request.model_dump())
  45. logger.info(f"任务已提交: {keyword} (任务ID: {result.id})")
  46. responses.append(TaskResponse(
  47. task_id=result.id,
  48. status="submitted",
  49. message=f"Search task submitted for keyword: {keyword}"
  50. ))
  51. except Exception as e:
  52. logger.error(f"提交任务失败 [{keyword}]: {str(e)}")
  53. responses.append(TaskResponse(
  54. task_id="",
  55. status="failed",
  56. message=str(e)
  57. ))
  58. return responses
  59. class CrawlItemRequest(BaseModel):
  60. page_ids: List[int]
  61. browser_config: Dict = {}
  62. @app.post("/crawl_item", summary="爬取单个搜索结果项")
  63. def crawl_item(request: CrawlItemRequest) -> List[TaskResponse]:
  64. """提交页面爬取任务"""
  65. responses = []
  66. for page_id in request.page_ids:
  67. try:
  68. task_data = {
  69. 'page_id': page_id,
  70. 'browser_config': request.browser_config,
  71. 'overwrite': False
  72. }
  73. result = celery_app.send_task('crawl_worker.crawl_page_urls', kwargs=task_data)
  74. responses.append(TaskResponse(
  75. task_id=result.id,
  76. status="submitted",
  77. message=f"Crawl task submitted for page ID: {page_id}"
  78. ))
  79. except Exception as e:
  80. logger.error(f"提交页面爬取任务失败 [页面ID {page_id}]: {str(e)}")
  81. responses.append(TaskResponse(
  82. task_id="",
  83. status="failed",
  84. message=str(e)
  85. ))
  86. return responses
  87. @app.post("/crawl_keywords", response_model=List[TaskResponse], summary="提交关键词URL爬取任务")
  88. def crawl_keywords(request: CrawlKeywordsRequest) -> List[TaskResponse]:
  89. """提交关键词URL爬取任务(对应worker.celery.crawl_client.submit_crawl_tasks)"""
  90. responses = []
  91. for keyword in request.keywords:
  92. try:
  93. task_data = {
  94. 'keyword': keyword.strip(),
  95. 'browser_config': {
  96. **request.browser_config,
  97. 'proxy': request.browser_config.get('proxy'),
  98. 'headless': True,
  99. 'verbose': False,
  100. 'extra_args': ["--disable-gpu", "--disable-dev-shm-usage", "--no-sandbox"]
  101. },
  102. 'overwrite': False
  103. }
  104. result = celery_app.send_task('crawl_worker.crawl_keyword_urls', kwargs=task_data)
  105. responses.append(TaskResponse(
  106. task_id=result.id,
  107. status="submitted",
  108. message=f"Keyword crawl task submitted: {keyword}"
  109. ))
  110. except Exception as e:
  111. logger.error(f"提交关键词URL爬取任务失败 [{keyword}]: {str(e)}")
  112. responses.append(TaskResponse(
  113. task_id="",
  114. status="failed",
  115. message=str(e)
  116. ))
  117. return responses
  118. @app.post("/convert_single", response_model=TaskResponse, summary="转换单个HTML结果")
  119. def convert_single(request: ConvertRequest) -> TaskResponse:
  120. """提交单个HTML转换任务"""
  121. try:
  122. result = celery_app.send_task('html_convert_tasks_worker.convert_single_result', args=[request.result_ids[0]])
  123. return TaskResponse(
  124. task_id=result.id,
  125. status="submitted",
  126. message=f"Conversion task submitted for result ID: {request.result_ids[0]}"
  127. )
  128. except Exception as e:
  129. logger.error(f"提交HTML转换任务失败 [结果ID {request.result_ids[0]}]: {str(e)}")
  130. raise HTTPException(
  131. status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
  132. detail=str(e)
  133. )
  134. @app.post("/convert_all", response_model=TaskResponse, summary="转换所有HTML结果")
  135. def convert_all() -> TaskResponse:
  136. """提交批量HTML转换任务"""
  137. try:
  138. result = celery_app.send_task('html_convert_tasks.convert_all_results')
  139. return TaskResponse(
  140. task_id=result.id,
  141. status="submitted",
  142. message="Bulk conversion task submitted for all results"
  143. )
  144. except Exception as e:
  145. logger.error(f"提交批量HTML转换任务失败: {str(e)}")
  146. raise HTTPException(
  147. status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
  148. detail=str(e)
  149. )
  150. @app.post("/browser/test", summary="测试浏览器启动")
  151. async def test_browser(request: BrowserTestRequest):
  152. try:
  153. logger.info(f"Testing browser launch with parameters: {request.model_dump()}")
  154. page = await test_browser_launch(request.browser_config, request.init_url)
  155. return {"status": "success", "url": page.url}
  156. except Exception as e:
  157. logger.error(f"Error during browser test: {str(e)}")
  158. raise HTTPException(status_code=500, detail=str(e))
  159. async def test_browser_launch(browser_config: Dict, init_url: str) -> ChromiumPage:
  160. # 如果配置中有代理,使用代理创建浏览器实例
  161. page = load_chrome_from_ini(
  162. **browser_config,
  163. proxy=browser_config.get('proxy') # 自动处理代理配置
  164. )
  165. page.get(init_url)
  166. return page