| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- import time
- import re
- import logging
- from pathlib import Path
- from DrissionPage import ChromiumPage
- from pydantic import BaseModel
- from scrapling import Adaptor
- from mylib.logu import logger
- from mylib.base import save_to_file
- from config.settings import OUTPUT_DIR
- from mylib.drission_page import load_chrome_from_ini
- from worker.search_engine.search_result_db import SearchResultManager, SearchResultItem, KeywordTask
- from worker.search_engine.smart_selector import get_search_ele
- from DrissionPage.common import Keys
- class SearchResultEle(BaseModel):
- search_div: bool | None = None
- next_page_url: str | None = None
- current_page: int | None = None
- results: list[SearchResultItem] = []
- class GoogleSearchHandlerDrission:
- def __init__(self, page: ChromiumPage):
- self.page = page
- self.db_manager = SearchResultManager()
- self.save_dir = OUTPUT_DIR / 'results'
-
- def save_current_page(self, keyword: str, filename: str=time.strftime("%Y%m%d_%H%M%S")) -> Path:
- html_dir = self.save_dir / keyword
- html_dir.mkdir(parents=True, exist_ok=True)
- html_path = save_to_file(self.page.html, html_dir / f"{filename}.html")
- logger.info(f"save_to_file {html_path}")
- return html_path
- def _process_single_page(self, keyword: str) -> SearchResultEle:
- content = self.page.html
- result_ele = self.get_search_result_ele(content)
-
- if not result_ele.search_div:
- logger.warning(f"未找到搜索结果容器,可能遇到验证页面 keyword: {keyword}")
- return result_ele
- html_path = self.save_current_page(keyword, filename=f"{result_ele.current_page}")
- page_result = self.db_manager.save_page_results(
- keyword=keyword,
- page_number=result_ele.current_page,
- results_count=len(result_ele.results) if result_ele.results else 0,
- has_next_page=bool(result_ele.next_page_url),
- html_path=html_path
- )
- if result_ele.results:
- self.db_manager.save_result_items(
- keyword=keyword,
- page_id=page_result.id,
- items=result_ele.results,
- html_path=html_path,
- )
-
- return result_ele
- def check_keyword(self, keyword: str, skip_existing: bool) -> tuple[bool, KeywordTask]:
- key_model = self.db_manager.get_keyword_task(keyword)
- if skip_existing and key_model.is_completed:
- logger.info(f"关键词任务已完成,跳过处理: id={key_model.id} {keyword}")
- return True, key_model
- if key_model:
- self.db_manager.delete_keyword_task(keyword)
- key_model = self.db_manager.create_keyword_task(keyword)
- return False, key_model
- def process_keyword(self, keyword: str, max_result_items: int = 200, skip_existing: bool = False):
- _, key_model = self.check_keyword(keyword, skip_existing)
- self.search(keyword)
- search_result_item_count = 0
- should_complete = False
-
- while True:
- result_ele = self._process_single_page(keyword)
- if not result_ele.search_div:
- break
- search_result_item_count += len(result_ele.results) if result_ele.results else 0
- if search_result_item_count >= max_result_items or not result_ele.next_page_url:
- should_complete = True
- break
- try:
- self.page.scroll.to_bottom()
- time.sleep(3)
- next_btn = self.page.ele('#pnnext')
- if next_btn:
- next_btn.click()
- logger.info(f"跳转到下一页: {self.page.url}")
- self.page.wait.load_start()
- else:
- break
- except Exception as e:
- logger.warning(f"翻页失败: {str(e)}")
- break
- if should_complete:
- key_model = self.db_manager.mark_task_completed(keyword)
- logger.info(f"正常完成关键词处理: {keyword}")
- else:
- logger.warning(f"关键词处理被中断: {keyword}")
- return key_model
- def goto_home_page(self):
- url = "https://www.google.com"
- if self.page.url != url:
- self.page.get(url)
- self.page.wait.load_start()
- if 'sorry/' in self.page.url:
- raise Exception(f"出现人机验证,正在换身份重试。。 {self.page.url}")
- def search(self, query: str):
- self.goto_home_page()
- search_ele_dict = get_search_ele(self.page.html)
- if not search_ele_dict:
- raise Exception("未找到搜索框")
- search_box = self.page.ele(f'xpath:{search_ele_dict['xpath']}')
- search_box.input(query)
- self.page.actions.type(Keys.ENTER)
- self.page.wait.load_start()
- def get_current_page_num(self) -> int:
- if '/search?q=' in self.page.url:
- match = re.search(r'&start=(\d+)', self.page.url)
- return int(match.group(1)) // 10 + 1 if match else 1
- def get_search_result_ele(self, html_content: str):
- res = SearchResultEle(
- search_div=None,
- next_page_url=None,
- current_page=self.get_current_page_num(),
- results=[]
- )
- page = Adaptor(html_content)
- body = Adaptor(page.body)
- search_div = body.xpath('//div[@id="search"]')
- next_page_url = body.xpath_first('//a[@id="pnnext"]/@href')
- res.search_div = bool(search_div)
- res.next_page_url = f"https://www.google.com{next_page_url}" if next_page_url else None
- if not search_div:
- return res
- result_list = search_div.xpath('//*[@data-snc]')
- logger.info(f"当前页结果数量: {len(result_list)}, next_page_url: {next_page_url}")
-
- for result_item in result_list:
- if len(result_item.children) < 2:
- continue
-
- result = SearchResultItem()
- title_ele = result_item.children[0]
- if title_ele:
- result.url = title_ele.xpath_first('.//a/@href')
- result.title = title_ele.xpath_first('.//h3/text()')
- content_ele = result_item.children[1]
- if content_ele:
- content_list = content_ele.xpath('.//span/text()')
- result.content = ''.join(content_list) if content_list else None
- if any([result.url, result.title, result.content]):
- res.results.append(result)
- return res
- def search_keyword_drission(keyword, max_result_items=200, skip_existing=True):
- ret = {'error': 0, 'msg': '', 'data': None}
- logger.info(f"keyword {keyword} max_result_items: {max_result_items} skip_existing: {skip_existing}")
- search_handler = GoogleSearchHandlerDrission(None)
- exist, keyword_model = search_handler.check_keyword(keyword, skip_existing)
- if exist and keyword_model.is_completed:
- ret['data'] = keyword_model.model_dump()
- return ret
-
- page = load_chrome_from_ini(proxy='http://127.0.0.1:1881')
- try:
- search_handler = GoogleSearchHandlerDrission(page)
- kw = search_handler.process_keyword(keyword, max_result_items=max_result_items, skip_existing=skip_existing)
- if not kw:
- ret['error'] = 1
- html_path = search_handler.save_current_page(keyword, filename=f"warning_{time.strftime('%Y%m%d_%H%M%S')}")
- logger.warning(f"关键词任务未完成: {keyword} html_path: {html_path}")
- ret['msg'] = f"关键词任务未完成: {keyword}"
- ret['data'] = html_path
- return ret
- ret['data'] = kw.model_dump()
- return ret
- except Exception as e:
- html_path = search_handler.save_current_page(keyword, filename=f"error_{time.strftime('%Y%m%d_%H%M%S')}")
- logger.exception(f"失败: {str(e)} html_path: {html_path}")
- ret['error'] = 1
- ret['msg'] = f"失败: {str(e)}"
- ret['data'] = html_path
- page.quit()
- return ret
- def main():
- search_keyword_drission("drission", max_result_items=15)
- if __name__ == "__main__":
- main()
|