| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- import asyncio
- import json
- import logging
- from pathlib import Path
- import re
- import time
- from camoufox import AsyncCamoufox
- from pydantic import BaseModel
- from scrapling import Adaptor
- from worker.search_engine.camoufox_broswer import BrowserConfig, BrowserCore
- from playwright.async_api import Browser, Page, async_playwright
- from playwright.sync_api import sync_playwright
- from mylib.logu import logger
- from mylib.base import save_to_file
- from config.settings import OUTPUT_DIR
- from worker.search_engine.search_result_db import SearchResultManager, SearchResultItem, KeywordTask
- from worker.search_engine.smart_selector import get_search_ele
- async def async_input(prompt):
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(None, input, prompt)
- class SearchResultEle(BaseModel):
- search_div: bool | None = None
- next_page_url: str | None = None
- current_page: int | None = None
- results: list[SearchResultItem] = []
- class GoogleSearchHandler():
- def __init__(self, page: Page):
- self.page = page
- self.db_manager = SearchResultManager()
- self.save_dir = OUTPUT_DIR / 'results'
-
- async def save_current_page(self, keyword: str, filename: str=time.strftime("%Y%m%d_%H%M%S")):
- html_dir = self.save_dir / keyword
- html_dir.mkdir(parents=True, exist_ok=True)
- html_path = save_to_file(await self.page.content(), html_dir / f"{filename}.html")
- logger.info(f"save_to_file {html_path}")
- return html_path
- async def _process_single_page(self, keyword: str) -> SearchResultEle:
- content = await self.page.content()
- result_ele = self.get_search_result_ele(content)
-
- if not result_ele.search_div:
- logger.warning(f"未找到搜索结果容器,可能遇到验证页面 keyword: {keyword}")
- return result_ele
- html_path = await self.save_current_page(keyword, filename=f"{result_ele.current_page}")
- page_result = self.db_manager.save_page_results(
- keyword=keyword,
- page_number=result_ele.current_page,
- results_count=len(result_ele.results) if result_ele.results else 0,
- has_next_page=bool(result_ele.next_page_url),
- html_path=html_path
- )
- if result_ele.results:
- self.db_manager.save_result_items(
- keyword=keyword,
- page_id=page_result.id,
- items=result_ele.results,
- html_path=html_path,
- )
-
- return result_ele
- async def process_keyword(self, keyword: str, max_result_items: int = 200, skip_existing: bool = False):
- key_model = self.db_manager.get_keyword_task(keyword)
- if skip_existing and key_model:
- logger.info(f"关键词任务已完成,跳过处理: {keyword}")
- return key_model
- # 删除旧数据重新创建任务
- if key_model:
- self.db_manager.delete_keyword_task(keyword)
- key_model = self.db_manager.create_keyword_task(keyword)
- await self.search(keyword)
- search_result_item_count = 0
- should_complete = False # 标记是否满足完成条件
-
- while True:
- result_ele = await self._process_single_page(keyword)
- # 处理验证页面等异常情况
- if not result_ele.search_div:
- break
- search_result_item_count += len(result_ele.results) if result_ele.results else 0
- # 达到最大结果数或没有下一页时标记完成
- if search_result_item_count >= max_result_items or not result_ele.next_page_url:
- should_complete = True
- break
- try:
- await self.page.evaluate("window.scrollTo(0, document.body.scrollHeight);")
- await asyncio.sleep(3)
- await self.page.click("//a[@id='pnnext']", timeout=10000)
- logger.info(f"self.page.url {self.page.url}")
- await self.page.wait_for_load_state('load', timeout=10000)
- except Exception as e:
- logger.warning(f"翻页失败: {str(e)}")
- break
- # 只有正常完成时才标记任务完成
- if should_complete:
- key_model = self.db_manager.mark_task_completed(keyword)
- logger.info(f"正常完成关键词处理: {keyword}")
- else:
- logger.warning(f"关键词处理被中断: {keyword}")
- return key_model
- async def goto_home_page(self):
- url = "https://www.google.com"
- if self.page.url != url:
- await self.page.goto(url)
- await self.page.wait_for_load_state('load', timeout=10000)
- if 'sorry/' in self.page.url:
- user_input = await async_input("出现人机验证,是否继续?(y/n): ")
- if user_input.lower() != 'y':
- self.goto_home_page()
- else:
- raise Exception("用户选择退出,程序终止。")
-
- raise Exception(f"出现人机验证,正在换身份重试。。 {self.page.url}")
- def find_search_div(self, html_content: str) -> str:
- return bool(Adaptor(html_content).xpath_first('//div[@id="search"]'))
- async def search(self, query: str) -> dict:
- await self.goto_home_page()
- search_ele_dict = get_search_ele(await self.page.content())
- if not search_ele_dict:
- raise Exception("未找到搜索框")
- textarea = self.page.locator(search_ele_dict['xpath'])
- await textarea.fill(query, timeout=10000)
- await textarea.press('Enter')
- await self.page.wait_for_load_state(state='load', timeout=10000)
- return await self.page.content()
- def get_current_page_num(self) -> int:
- if '/search?q=' in self.page.url:
- match = re.search(r'&start=(\d+)', self.page.url)
- return int(match.group(1)) // 10 + 1 if match else 1
- def get_search_result_ele(self, html_content: str):
- res = SearchResultEle(
- search_div=None,
- next_page_url=None,
- current_page=self.get_current_page_num(),
- results=[]
- )
- page = Adaptor(html_content)
- body = Adaptor(page.body)
- search_div = body.xpath('//div[@id="search"]')
- next_page_url = body.xpath_first('//a[@id="pnnext"]/@href')
- res.search_div = bool(search_div)
- res.next_page_url = f"https://www.google.com{next_page_url}" if next_page_url else None
- if not search_div:
- return res
- result_list = search_div.xpath('//*[@data-snc]')
- logger.info(f"当前页结果数量: {len(result_list)}, next_page_url: {next_page_url}")
-
- for result_item in result_list:
- if len(result_item.children) < 2:
- continue
-
- result = SearchResultItem()
- title_ele = result_item.children[0]
- if title_ele:
- result.url = title_ele.xpath_first('.//a/@href')
- result.title = title_ele.xpath_first('.//h3/text()')
- content_ele = result_item.children[1]
- if content_ele:
- content_list = content_ele.xpath('.//span/text()')
- result.content = ''.join(content_list) if content_list else None
- if any([result.url, result.title, result.content]):
- res.results.append(result)
- return res
- async def search_keyword(keyword, max_result_items=200, skip_existing=False, config: BrowserConfig = BrowserConfig()):
- ret = {'error': 0, 'msg': '', 'data': None}
- config_dict = config.model_dump()
- logger.info(f"BrowserConfig {config_dict}")
- logger.info(f"keyword {keyword} max_result_items: {max_result_items} skip_existing: {skip_existing}")
- async with AsyncCamoufox(**config_dict) as browser:
- try:
-
- search_handler = GoogleSearchHandler(await browser.new_page())
- kw = await search_handler.process_keyword(keyword, max_result_items=max_result_items, skip_existing=skip_existing)
- if not kw:
- ret['error'] = 1
- html_path = await search_handler.save_current_page(keyword, filename=f"warning_{time.strftime('%Y%m%d_%H%M%S')}")
- logger.warning(f"关键词任务未完成: {keyword} html_path: {html_path}")
- ret['msg'] = f"关键词任务未完成: {keyword}"
- ret['data'] = html_path
- return ret
- ret['data'] = kw.model_dump()
- return ret
- except Exception as e:
- html_path = await search_handler.save_current_page(keyword, filename=f"error_{time.strftime('%Y%m%d_%H%M%S')}")
- logger.exception(f"失败: {str(e)} html_path: {html_path}")
- ret['error'] = 1
- ret['msg'] = f"失败: {str(e)}"
- ret['data'] = html_path
- return ret
- async def aio_main(config: BrowserConfig = BrowserConfig()):
- try:
- async with BrowserCore(config) as core:
- search_handler = GoogleSearchHandler(core.page)
- await search_handler.goto_home_page()
- keywords = [
- 'Acampe carinata essential oil',
- ]
- while True:
- await asyncio.sleep(1)
-
- except Exception as e:
- logger.error(f"失败: {str(e)}")
- def analyze():
- html_file = Path(r"K:\code\upwork\zhang_crawl_bio\output\results\Acampe_rigida_essential_oil\page_1.html")
- class TestPage:
- url = html_file.as_uri()
- search_handler = GoogleSearchHandler(page=TestPage())
- res = search_handler.get_search_result_ele(html_file.read_text())
- logger.info(f"{res.model_dump_json(indent=4,)}")
- def main():
- asyncio.run(aio_main())
- if __name__ == "__main__":
- main()
|