Quellcode durchsuchen

完成搜索列表解析

mrh vor 1 Jahr
Ursprung
Commit
8ea0ebcfc6
2 geänderte Dateien mit 94 neuen und 9 gelöschten Zeilen
  1. 27 3
      tests/mytest/scrapling_t.py
  2. 67 6
      worker/search_engine/google_search.py

+ 27 - 3
tests/mytest/scrapling_t.py

@@ -8,6 +8,7 @@ import requests
 from scrapling import Adaptor
 from pathlib import Path
 from mylib.base import save_to_file
+from trafilatura import fetch_url, extract
 
 def stackoverflow_demo():
     response = requests.get('https://stackoverflow.com/questions/tagged/web-scraping?sort=MostVotes&filters=NoAcceptedAnswer&edited=true&pagesize=50&page=2')
@@ -27,13 +28,36 @@ def stackoverflow_demo():
         for i, (title, author) in enumerate(zip(first_question_title.find_similar(), first_question_author.find_similar()), start=1):
             print(i, title.text, author.text)
 
+def analyze_html(html_content: str) -> dict:
+    output_dir = Path(r'K:\code\upwork\zhang_crawl_bio\output\analyze')
+    output_format_list = ["csv", "html", "json", "markdown", "txt", "xml", "xmltei"]
+    for output_format in output_format_list:
+        result = extract(html_content, output_format=output_format, with_metadata=True)
+        save_path = save_to_file(result, Path(output_dir / 'ext').with_suffix(f'.{output_format}'))
+        print(f"save_path: {save_path}")
+
+
 def google_search_demo():
     file = Path(r'K:\code\upwork\zhang_crawl_bio\output\google_search\Acalypha manniana essential oil\10.html')
+    # file = Path(r'K:\code\upwork\zhang_crawl_bio\output\analyze\search_result.html')
     html_content = file.read_text(encoding='utf-8')
     page = Adaptor(html_content)
-    page.has_class('quote')
-    print(page.find_by_text('Medicinal plants from the genus Acalypha'))
-
+    search_div = page.xpath('//div[@id="search"]')
+    print("search_div:", search_div)
+    if search_div:
+        print("找到 search div:")
+        result_list = search_div.xpath('//*[@data-rpos]')
+        # 从 result_list 中过滤掉任何没有 href 属性的所有子元素,xpath 语法筛选
+        result_list = [item for item in result_list if item.xpath('.//cite')]
+        result_list = [item for item in result_list if not item.xpath('.//*[@data-initq]')]
+        print("result_list:", result_list)
+        print("实际 7 ,result_list len:", len(result_list))
+        for result in result_list:
+            result.attrib
+            print("result.attrib:", result.attrib)
+        # print(search_div[0].html_content)  # 打印 div 中的文本内容
+    else:
+        print("未找到 search div")
 def main():
     google_search_demo()
 

+ 67 - 6
worker/search_engine/google_search.py

@@ -1,12 +1,16 @@
 import asyncio
+import json
 import logging
 from pathlib import Path
 import re
+from pydantic import BaseModel
+from scrapling import Adaptor
 from worker.search_engine.camoufox_broswer import BrowserConfig, BrowserCore
 from playwright.async_api import Browser, Page, async_playwright
 from playwright.sync_api import sync_playwright
 from mylib.logu import logger
-
+from mylib.base import save_to_file
+from config.settings import OUTPUT_DIR
 # ------------------- Search Engine Implementation -------------------
 class GoogleSearchHandler():
     """搜索引擎专用处理器(通过CDP连接)"""
@@ -22,29 +26,79 @@ class GoogleSearchHandler():
         """执行搜索操作"""
         try:
             await self.goto_home_page()
-            await self.page.fill('text1area[aria-label="Search"]', query, timeout=3000)
+            await self.page.fill('textarea[aria-label="Search"]', query, timeout=10000)
             await self.page.press('textarea[aria-label="Search"]', 'Enter')
+            # 等待加载完成
+            await self.page.wait_for_load_state(state='load', timeout=10000)
             return await self.page.content()
         except Exception as e:
-            logger.error(f"Search failed: {str(e)}")
+            logger.exception(f"Search failed: {str(e)}")
             return {"status": "error", "message": str(e)}
+    def get_search_result_ele(self, html_content:str):
+        include = {
+            'search_div': '//div[@id="search"]',
+            'cite': './/cite'
+        }
+        exclude = {
+            'people_also_ask': './/*[@data-initq]'
+        }
+        selector_xpath = {
+            'include': include,
+            'exclude': exclude
+        }
+        res = {}
+        page = Adaptor(html_content)
+        body = Adaptor(page.body)
+        search_div = body.xpath(selector_xpath['include']['search_div'])
+        res['search_div'] = True if search_div else False
+        if search_div:
+            # 获取所有 a 标签
+            # result_list = search_div.xpath('.//span/a/h3')
+            result_list = search_div.xpath('//*[@data-snc]')
+            logger.info(f"result_list {len(result_list)}")
+            # h3_list = [item for item in result_list if item.xpath('//h3')]
+            search_res = {'total_count': len(result_list), 'results': []}
+            for result_item in result_list:
+                # logger.info(f"result_item {type(result_item)} {result_item}")
+                result = {}
+                if len(result_item.children) < 2:
+                    continue
+                title_ele = result_item.children[0]
+                if title_ele:
+                    url = title_ele.xpath_first('.//a/@href')
+                    result['url'] = url
+                    title = title_ele.xpath_first('.//h3/text()')
+                    result['title'] = title
 
+                content_ele = result_item.children[1]
+                logger.info(f"content_ele {content_ele}")
+                if content_ele:
+                    content_list = content_ele.xpath('.//span/text()')
+                    result['content'] = ''.join(content_list)
+                logger.info(f"result {result}")
+                if result:
+                    search_res['results'].append(result)
+        return search_res
 async def aio_main(config: BrowserConfig = BrowserConfig()):
-    """通过CDP连接浏览器实例"""
     try:
         core = await BrowserCore.get_instance(config)
         search_handler = GoogleSearchHandler(core.page)
         
         # 测试搜索功能
         content = await search_handler.search('python playwright')
+        save_path = save_to_file(content, OUTPUT_DIR /'analyze'/ 'test.html')
+        logger.info(f"save_path {save_path}")
         logger.info(f"当前页面: {search_handler.page.url}")
+        res = search_handler.get_search_result_ele(content)
+        # 漂亮输出
+        logger.info(f"{json.dumps(res, indent=4, ensure_ascii=False)}")
         # html_save = 
         # 保持连接活跃
         while True:
             await asyncio.sleep(5)
             
     except Exception as e:
-        logger.error(f"CDP连接失败: {str(e)}")
+        logger.error(f"失败: {str(e)}")
         raise
 def connet_ws():
     with sync_playwright() as p:
@@ -59,8 +113,15 @@ def connet_ws():
         print(page.url)
     return
 
+def analyze():
+    html_file = Path(r"K:\code\upwork\zhang_crawl_bio\output\analyze\test.html")
+    search_handler = GoogleSearchHandler(None)
+    res = search_handler.get_search_result_ele(html_file.read_text())
+    logger.info(f"{json.dumps(res, indent=4, ensure_ascii=False)}")
+
 def main():
-    asyncio.run(aio_main())
+    analyze()
+    # asyncio.run(aio_main())
 
 if __name__ == "__main__":
     main()