| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- from pathlib import Path
- import time
- import re
- from typing import Optional
- from DrissionPage import ChromiumPage
- from DrissionPage.common import Keys
- from pydantic import BaseModel
- from scrapling import Adaptor
- from mylib.logu import logger
- from mylib.base import save_to_file
- from worker.search_engine.search_result_db import SearchResultManager, SearchResultItem, KeywordTask
- from config.settings import OUTPUT_DIR
- from mylib.drission_page import load_chrome_from_ini
- class SearchResultEle(BaseModel):
- search_div: bool | None = None
- next_page_url: str | None = None
- current_page: int | None = None
- results: list[SearchResultItem] = []
- class GoogleSearchHandlerDrission:
- def __init__(self, page: ChromiumPage):
- self.page = page
- self.db_manager = SearchResultManager()
- self.save_dir = OUTPUT_DIR / 'results'
-
- def save_current_page(self, keyword: str, filename: str = time.strftime("%Y%m%d_%H%M%S")) -> Path:
- html_dir = self.save_dir / keyword
- html_dir.mkdir(parents=True, exist_ok=True)
- html_path = save_to_file(self.page.html, html_dir / f"{filename}.html")
- logger.info(f"save_to_file {html_path}")
- return html_path
- def _process_single_page(self, keyword: str) -> SearchResultEle:
- content = self.page.html
- result_ele = self.get_search_result_ele(content)
-
- if not result_ele.search_div:
- logger.warning(f"未找到搜索结果容器,可能遇到验证页面 keyword: {keyword}")
- return result_ele
- html_path = self.save_current_page(keyword, filename=f"{result_ele.current_page}")
- page_result = self.db_manager.save_page_results(
- keyword=keyword,
- page_number=result_ele.current_page,
- results_count=len(result_ele.results) if result_ele.results else 0,
- has_next_page=bool(result_ele.next_page_url),
- html_path=html_path
- )
- if result_ele.results:
- self.db_manager.save_result_items(
- keyword=keyword,
- page_id=page_result.id,
- items=result_ele.results,
- html_path=html_path,
- )
-
- return result_ele
- def process_keyword(self, keyword: str, max_result_items: int = 200, skip_existing: bool = False) -> KeywordTask:
- if skip_existing:
- key_model = self.db_manager.get_keyword_task(keyword)
- if key_model:
- logger.info(f"关键词任务已完成,跳过处理: {keyword}")
- return key_model
- self.db_manager.create_keyword_task(keyword)
- self.search(keyword)
-
- has_next = True
- search_result_item_count = 0
- finitsh_flag = False
- while has_next:
- result_ele = self._process_single_page(keyword)
- search_result_item_count += len(result_ele.results) if result_ele.results else 0
-
- if search_result_item_count > max_result_items:
- logger.info(f"关键词 {keyword} 单页结果数量超过 {max_result_items} ,跳过处理下一页")
- finitsh_flag = True
- break
- if result_ele.next_page_url:
- self.page.scroll.to_bottom()
- time.sleep(3)
- next_btn = self.page.ele('#pnnext')
- if next_btn:
- next_btn.click()
- logger.info(f"跳转到下一页: {self.page.url}")
- else:
- finitsh_flag = True
- break
- else:
- break
-
- return key_model
- def goto_home_page(self):
- url = "https://www.google.com"
- if self.page.url != url:
- self.page.get(url)
- if 'sorry/' in self.page.url:
- raise Exception("出现人机验证,需要人工干预")
- def search(self, query: str):
- self.goto_home_page()
- search_box = self.page.ele('textarea')
- search_box.input(query)
- self.page.actions.type(Keys.ENTER)
- def get_current_page_num(self) -> int:
- if '/search?q=' in self.page.url:
- match = re.search(r'&start=(\d+)', self.page.url)
- return int(match.group(1)) // 10 + 1 if match else 1
- def get_search_result_ele(self, html_content: str) -> SearchResultEle:
- res = SearchResultEle(
- search_div=None,
- next_page_url=None,
- current_page=self.get_current_page_num(),
- results=[]
- )
- page = Adaptor(html_content)
- body = Adaptor(page.body)
- search_div = body.xpath('//div[@id="search"]')
- next_page_url = body.xpath_first('//a[@id="pnnext"]/@href')
- res.search_div = bool(search_div)
- res.next_page_url = f"https://www.google.com{next_page_url}" if next_page_url else None
-
- if not search_div:
- return res
- result_list = search_div.xpath('//*[@data-snc]')
- logger.info(f"当前页结果数量: {len(result_list)}")
-
- for result_item in result_list:
- if len(result_item.children) < 2:
- continue
-
- result = SearchResultItem()
- title_ele = result_item.children[0]
- if title_ele:
- result.url = title_ele.xpath_first('.//a/@href')
- result.title = title_ele.xpath_first('.//h3/text()')
- content_ele = result_item.children[1]
- if content_ele:
- content_list = content_ele.xpath('.//span/text()')
- result.content = ''.join(content_list) if content_list else None
- if any([result.url, result.title, result.content]):
- res.results.append(result)
- return res
- def search_keyword_drission(keyword: str, max_result_items: int = 1, skip_existing: bool = False):
- # page = load_chrome_from_ini(proxy='http://localhost:1881')
- page = load_chrome_from_ini(proxy='http://localhost:1881')
- try:
- handler = GoogleSearchHandlerDrission(page)
- return handler.process_keyword(keyword, max_result_items, skip_existing)
- except Exception as e:
- logger.exception(f"关键词处理失败: {keyword}")
- raise
- def main():
- search_keyword_drission("drission")
- if __name__ == "__main__":
- main()
|