Bladeren bron

dp 完成一个关键词的搜索

mrh 1 jaar geleden
bovenliggende
commit
65a0744b7f
3 gewijzigde bestanden met toevoegingen van 183 en 3 verwijderingen
  1. 1 1
      CONVENTIONS.md
  2. 12 2
      mylib/drission_page.py
  3. 170 0
      worker/search_engine/drission_google_search.py

+ 1 - 1
CONVENTIONS.md

@@ -21,7 +21,7 @@
 
 - 当前环境是 python 3.12 ,务必要保持最新的接口来开发,例如 Fastapi 不再使用 app.event ,而是使用 lifespan 。pydantic.BaseModel 不再支持 dict() ,而是用 model_dump()
 
-重要:由于你是在 aider 开发环境中,如果你要编写任何文件的代码,都不能省略已有代码,必须完整写完不能"... [文件其余部分保持不变] ..." 这样简略那些代码
+重要:由于你是在 aider 开发环境中,如果你要编写任何文件的代码,都不能省略已有代码,必须完整写完
 
 # 项目说明:
 - 测试模块在 tests 的目录中, `tests\mytest` 是我私人的草稿,不用理会

+ 12 - 2
mylib/drission_page.py

@@ -1,10 +1,13 @@
+import os
 import time
 from DrissionPage import Chromium, ChromiumOptions, ChromiumPage
 from pathlib import Path
 from config.settings import OUTPUT_DIR, WORK_DIR, CONFIG_DIR
 from mylib.random_ua import get_random_user_agent
+from mylib.logu import logger
 from fake_useragent import UserAgent
 
+
 BROWSER_PATH=r"C:\Program Files\Google\Chrome\Application\chrome.exe"
 
 def genarate_chrome_ini(address="localhost:9321"):
@@ -12,12 +15,19 @@ def genarate_chrome_ini(address="localhost:9321"):
     chrome_options = ChromiumOptions().set_browser_path(BROWSER_PATH)
     chrome_options.set_address(address)
     chrome_options.set_user_data_path(str(OUTPUT_DIR / f'user_data_dir_{port}'))
-    chrome_options.no_imgs(True).mute(True)
+    # chrome_options.no_imgs(True).mute(True)
     # chrome_options.incognito(True)
     path = chrome_options.save(CONFIG_DIR / f'{port}.ini')
     return path
-def load_chrome_from_ini(path=CONFIG_DIR / '9321.ini', headless=False):
+def load_chrome_from_ini(path=CONFIG_DIR / '9321.ini', headless=False, proxy=None):
     chrome_options = ChromiumOptions(ini_path=path)
+    if proxy:
+        chrome_options.set_proxy(proxy)
+    # 如果存在代理环境变量
+    elif 'HTTP_PROXY' in os.environ:
+        chrome_options.set_proxy(os.environ['HTTP_PROXY'])
+    chrome_options.auto_port(True)
+    logger.info(f"proxy {proxy}")
     page = ChromiumPage(chrome_options)
     return page
 

+ 170 - 0
worker/search_engine/drission_google_search.py

@@ -0,0 +1,170 @@
+from pathlib import Path
+import time
+import re
+from typing import Optional
+from DrissionPage import ChromiumPage
+from DrissionPage.common import Keys
+from pydantic import BaseModel
+from scrapling import Adaptor
+from mylib.logu import logger
+from mylib.base import save_to_file
+from worker.search_engine.search_result_db import SearchResultManager, SearchResultItem, KeywordTask
+from config.settings import OUTPUT_DIR
+from mylib.drission_page import load_chrome_from_ini
+
+class SearchResultEle(BaseModel):
+    search_div: bool | None = None
+    next_page_url: str | None = None
+    current_page: int | None = None
+    results: list[SearchResultItem] = []
+
+class GoogleSearchHandlerDrission:
+    def __init__(self, page: ChromiumPage):
+        self.page = page
+        self.db_manager = SearchResultManager()
+        self.save_dir = OUTPUT_DIR / 'results'
+    
+    def save_current_page(self, keyword: str, filename: str = time.strftime("%Y%m%d_%H%M%S")) -> Path:
+        html_dir = self.save_dir / keyword
+        html_dir.mkdir(parents=True, exist_ok=True)
+        html_path = save_to_file(self.page.html, html_dir / f"{filename}.html")
+        logger.info(f"save_to_file {html_path}")
+        return html_path
+
+    def _process_single_page(self, keyword: str) -> SearchResultEle:
+        content = self.page.html
+        result_ele = self.get_search_result_ele(content)
+        
+        if not result_ele.search_div:
+            logger.warning(f"未找到搜索结果容器,可能遇到验证页面 keyword: {keyword}")
+            return result_ele
+
+        html_path = self.save_current_page(keyword, filename=f"{result_ele.current_page}")
+        page_result = self.db_manager.save_page_results(
+            keyword=keyword,
+            page_number=result_ele.current_page,
+            results_count=len(result_ele.results) if result_ele.results else 0,
+            has_next_page=bool(result_ele.next_page_url),
+            html_path=html_path
+        )
+
+        if result_ele.results:
+            self.db_manager.save_result_items(
+                keyword=keyword,
+                page_id=page_result.id,
+                items=result_ele.results,
+                html_path=html_path,
+            )
+            
+        return result_ele
+
+    def process_keyword(self, keyword: str, max_result_items: int = 200, skip_existing: bool = False) -> KeywordTask:
+        if skip_existing:
+            key_model = self.db_manager.get_keyword_task(keyword)
+            if key_model:
+                logger.info(f"关键词任务已完成,跳过处理: {keyword}")
+                return key_model
+
+        self.db_manager.create_keyword_task(keyword)
+        self.search(keyword)
+        
+        has_next = True
+        search_result_item_count = 0
+        
+        while has_next:
+            result_ele = self._process_single_page(keyword)
+            search_result_item_count += len(result_ele.results) if result_ele.results else 0
+            
+            if search_result_item_count > max_result_items:
+                logger.info(f"关键词 {keyword} 单页结果数量超过 {max_result_items} ,跳过处理下一页")
+                break
+                
+            if result_ele.next_page_url:
+                self.page.scroll.to_bottom()
+                time.sleep(3)
+                next_btn = self.page.ele('#pnnext')
+                if next_btn:
+                    next_btn.click()
+                    logger.info(f"跳转到下一页: {self.page.url}")
+                else:
+                    break
+            else:
+                break
+                
+        key_model = self.db_manager.mark_task_completed(keyword)
+        logger.info(f"完成关键词处理: {keyword}")
+        return key_model
+
+    def goto_home_page(self):
+        url = "https://www.google.com"
+        if self.page.url != url:
+            self.page.get(url)
+        if 'sorry/' in self.page.url:
+            raise Exception("出现人机验证,需要人工干预")
+
+    def search(self, query: str):
+        self.goto_home_page()
+        search_box = self.page.ele('textarea')
+        search_box.input(query)
+        self.page.actions.type(Keys.ENTER)
+
+    def get_current_page_num(self) -> int:
+        if '/search?q=' in self.page.url:
+            match = re.search(r'&start=(\d+)', self.page.url)
+            return int(match.group(1)) // 10 + 1 if match else 1
+
+    def get_search_result_ele(self, html_content: str) -> SearchResultEle:
+        res = SearchResultEle(
+            search_div=None,
+            next_page_url=None,
+            current_page=self.get_current_page_num(),
+            results=[]
+        )
+
+        page = Adaptor(html_content)
+        body = Adaptor(page.body)
+        search_div = body.xpath('//div[@id="search"]')
+        next_page_url = body.xpath_first('//a[@id="pnnext"]/@href')
+        res.search_div = bool(search_div)
+        res.next_page_url = f"https://www.google.com{next_page_url}" if next_page_url else None
+        
+        if not search_div:
+            return res
+
+        result_list = search_div.xpath('//*[@data-snc]')
+        logger.info(f"当前页结果数量: {len(result_list)}")
+        
+        for result_item in result_list:
+            if len(result_item.children) < 2:
+                continue
+                
+            result = SearchResultItem()
+            title_ele = result_item.children[0]
+            if title_ele:
+                result.url = title_ele.xpath_first('.//a/@href')
+                result.title = title_ele.xpath_first('.//h3/text()')
+
+            content_ele = result_item.children[1]
+            if content_ele:
+                content_list = content_ele.xpath('.//span/text()')
+                result.content = ''.join(content_list) if content_list else None
+
+            if any([result.url, result.title, result.content]):
+                res.results.append(result)
+        return res
+
+def search_keyword_drission(keyword: str, max_result_items: int = 1, skip_existing: bool = False):
+    # page = load_chrome_from_ini(proxy='http://localhost:1881')
+    page = load_chrome_from_ini(proxy='http://localhost:1881')
+    try:
+        handler = GoogleSearchHandlerDrission(page)
+        return handler.process_keyword(keyword, max_result_items, skip_existing)
+    except Exception as e:
+        logger.exception(f"关键词处理失败: {keyword}")
+        raise
+
+def main():
+    search_keyword_drission("drission")
+
+if __name__ == "__main__":
+    main()