|
|
@@ -11,6 +11,7 @@ from playwright.sync_api import sync_playwright
|
|
|
from mylib.logu import logger
|
|
|
from mylib.base import save_to_file
|
|
|
from config.settings import OUTPUT_DIR
|
|
|
+
|
|
|
# ------------------- Search Engine Implementation -------------------
|
|
|
class GoogleSearchHandler():
|
|
|
"""搜索引擎专用处理器(通过CDP连接)"""
|
|
|
@@ -34,64 +35,77 @@ class GoogleSearchHandler():
|
|
|
except Exception as e:
|
|
|
logger.exception(f"Search failed: {str(e)}")
|
|
|
return {"status": "error", "message": str(e)}
|
|
|
+
|
|
|
def get_search_result_ele(self, html_content:str):
|
|
|
- include = {
|
|
|
- 'search_div': '//div[@id="search"]',
|
|
|
- 'cite': './/cite'
|
|
|
- }
|
|
|
- exclude = {
|
|
|
- 'people_also_ask': './/*[@data-initq]'
|
|
|
- }
|
|
|
- selector_xpath = {
|
|
|
- 'include': include,
|
|
|
- 'exclude': exclude
|
|
|
- }
|
|
|
- res = {}
|
|
|
+ class SearchResultItem(BaseModel):
|
|
|
+ url: str | None = None
|
|
|
+ title: str | None = None
|
|
|
+ content: str | None = None
|
|
|
+
|
|
|
+ class SearchResult(BaseModel):
|
|
|
+ total_count: int = 0
|
|
|
+ results: list[SearchResultItem] = []
|
|
|
+
|
|
|
+ class SearchResultEle(BaseModel):
|
|
|
+ search_div: bool | None = None
|
|
|
+ next_ele: bool | None = None
|
|
|
+ results: SearchResult | None = None
|
|
|
+
|
|
|
+ res = SearchResultEle(
|
|
|
+ search_div=None,
|
|
|
+ next_ele=None,
|
|
|
+ results=None
|
|
|
+ )
|
|
|
+
|
|
|
page = Adaptor(html_content)
|
|
|
body = Adaptor(page.body)
|
|
|
- search_div = body.xpath(selector_xpath['include']['search_div'])
|
|
|
- res['search_div'] = True if search_div else False
|
|
|
+ search_div = body.xpath('//div[@id="search"]')
|
|
|
+ next_ele = body.xpath('//a[@id="pnnext"]/@href')
|
|
|
+
|
|
|
+ res.search_div = bool(search_div)
|
|
|
+ res.next_ele = bool(next_ele)
|
|
|
+
|
|
|
if search_div:
|
|
|
- # 获取所有 a 标签
|
|
|
- # result_list = search_div.xpath('.//span/a/h3')
|
|
|
result_list = search_div.xpath('//*[@data-snc]')
|
|
|
logger.info(f"result_list {len(result_list)}")
|
|
|
- # h3_list = [item for item in result_list if item.xpath('//h3')]
|
|
|
- search_res = {'total_count': len(result_list), 'results': []}
|
|
|
+
|
|
|
+ search_res = SearchResult(total_count=len(result_list))
|
|
|
+
|
|
|
for result_item in result_list:
|
|
|
- # logger.info(f"result_item {type(result_item)} {result_item}")
|
|
|
- result = {}
|
|
|
if len(result_item.children) < 2:
|
|
|
continue
|
|
|
+
|
|
|
+ result = SearchResultItem()
|
|
|
title_ele = result_item.children[0]
|
|
|
if title_ele:
|
|
|
- url = title_ele.xpath_first('.//a/@href')
|
|
|
- result['url'] = url
|
|
|
- title = title_ele.xpath_first('.//h3/text()')
|
|
|
- result['title'] = title
|
|
|
+ result.url = title_ele.xpath_first('.//a/@href')
|
|
|
+ result.title = title_ele.xpath_first('.//h3/text()')
|
|
|
|
|
|
content_ele = result_item.children[1]
|
|
|
- logger.info(f"content_ele {content_ele}")
|
|
|
if content_ele:
|
|
|
content_list = content_ele.xpath('.//span/text()')
|
|
|
- result['content'] = ''.join(content_list)
|
|
|
- logger.info(f"result {result}")
|
|
|
- if result:
|
|
|
- search_res['results'].append(result)
|
|
|
- return search_res
|
|
|
+ result.content = ''.join(content_list) if content_list else None
|
|
|
+
|
|
|
+ if any([result.url, result.title, result.content]):
|
|
|
+ search_res.results.append(result)
|
|
|
+
|
|
|
+ res.results = search_res
|
|
|
+
|
|
|
+ return res
|
|
|
+
|
|
|
async def aio_main(config: BrowserConfig = BrowserConfig()):
|
|
|
try:
|
|
|
core = await BrowserCore.get_instance(config)
|
|
|
search_handler = GoogleSearchHandler(core.page)
|
|
|
|
|
|
# 测试搜索功能
|
|
|
- content = await search_handler.search('python playwright')
|
|
|
+ content = await search_handler.search('Acampe carinata essential oil')
|
|
|
save_path = save_to_file(content, OUTPUT_DIR /'analyze'/ 'test.html')
|
|
|
logger.info(f"save_path {save_path}")
|
|
|
logger.info(f"当前页面: {search_handler.page.url}")
|
|
|
res = search_handler.get_search_result_ele(content)
|
|
|
# 漂亮输出
|
|
|
- logger.info(f"{json.dumps(res, indent=4, ensure_ascii=False)}")
|
|
|
+ logger.info(f"{json.dumps(res.dict(), indent=4, ensure_ascii=False)}")
|
|
|
# html_save =
|
|
|
# 保持连接活跃
|
|
|
while True:
|
|
|
@@ -100,6 +114,7 @@ async def aio_main(config: BrowserConfig = BrowserConfig()):
|
|
|
except Exception as e:
|
|
|
logger.error(f"失败: {str(e)}")
|
|
|
raise
|
|
|
+
|
|
|
def connet_ws():
|
|
|
with sync_playwright() as p:
|
|
|
ws_file = Path(r'K:\code\upwork\zhang_crawl_bio\output\ws.txt')
|
|
|
@@ -117,7 +132,7 @@ def analyze():
|
|
|
html_file = Path(r"K:\code\upwork\zhang_crawl_bio\output\analyze\test.html")
|
|
|
search_handler = GoogleSearchHandler(None)
|
|
|
res = search_handler.get_search_result_ele(html_file.read_text())
|
|
|
- logger.info(f"{json.dumps(res, indent=4, ensure_ascii=False)}")
|
|
|
+ logger.info(f"{json.dumps(res.model_dump(), indent=4, ensure_ascii=False)}")
|
|
|
|
|
|
def main():
|
|
|
analyze()
|