Sfoglia il codice sorgente

完成关键词长尾词搜索,用数据模型传参

mrh 1 anno fa
parent
commit
d2951798c5

+ 62 - 23
src/browser/crawl_amz_search_key.py

@@ -1,5 +1,6 @@
 import asyncio
-import datetime
+import base64
+from datetime import datetime
 import json
 import os
 import re
@@ -7,15 +8,16 @@ import sys
 import time
 import asyncio
 import asyncio
-from pathlib import Path
-from typing import List
+from pathlib import Path,PurePosixPath
+from typing import List, Optional
+from pydantic import BaseModel, Field
 from sqlmodel import select, Session
 from DrissionPage._pages.chromium_tab import ChromiumTab
 from DrissionPage._units.listener import DataPacket
 from utils.logu import get_logger
 from config.settings import OUTPUT_DIR,TEMP_PAGE_DIR
 from utils.drission_page import load_chrome_from_ini,ChromeOptions
-from utils.file import save_to_file,check_exists,s3,read_file
+from utils.file import save_to_file,check_exists,s3,read_file,s3_uri_to_http_url
 from config.settings import CFG
 from src.browser.crawl_base import CrawlerBase
 
@@ -23,7 +25,24 @@ logger = get_logger('browser')
 AMZ_HTML_DIR = OUTPUT_DIR / 'page' / 'amz'
 AMZ_HTML_DIR.mkdir(parents=True, exist_ok=True)
 
+class SearchKeyResult(BaseModel):
+    suggestions:List[str] = []
+    search_key:Optional[str] = None
+    mhtml_path:Optional[str] = None
+    screenshot:Optional[str] = None
+    error:Optional[int] = None
+    msg:Optional[str] = None
+    created_at:Optional[datetime] = Field(default_factory=datetime.now)
+
+class CrawlerSearchKeyInput(BaseModel):
+    search_key:str
+    mhtml_path:Optional[str] = None
+    screenshot_path:Optional[str] = None
+    overwrite:Optional[bool] = False
+
+
 class CrawlerAmzSearchKey(CrawlerBase):
+    s3_prefix = f"{CFG.s3_prefix}/output/amz/"
     def __init__(self, chrome_options:ChromeOptions):
         super().__init__(chrome_options)
         tabs = self.page.get_tabs(url='amazon')
@@ -44,22 +63,22 @@ class CrawlerAmzSearchKey(CrawlerBase):
         # self.tab.listen.start(method='GET', )
         input_box.input(search_key)
         suggestion_ele_list = self.tab.s_ele('xpath://input[@id="sac-autocomplete-results-container"]', timeout=3)
-        return self.tab.save()
 
-    async def crawl_suggestion(self, search_key:str, save_path:str=None, overwrite:bool=False):
-        if not check_exists(save_path) or overwrite:
-            mhtml_str = await asyncio.to_thread(self.search_key_and_save_page, search_key)
-            self.tab.save(save_path)
-            screenshot = self.tab.get_screenshot(as_bytes=True)
-            save_img_path = Path(save_path).parent / f"{Path(save_path).stem}.png"
-            save_to_file(screenshot, save_img_path)
-            logger.info(f"{save_path}")
+    async def crawl_suggestion(self, search_input:CrawlerSearchKeyInput) -> SearchKeyResult:
+        if not check_exists(search_input.mhtml_path) or search_input.overwrite:
+            await asyncio.to_thread(self.search_key_and_save_page, search_input.search_key)
+            save_mhtml_path,temp_mhtml_path = self.save_current_page(self.tab, search_input.mhtml_path, after_unlink=False)
+            if search_input.screenshot_path:
+                screenshot = self.tab.get_screenshot(as_bytes=True)
+                save_to_file(screenshot, str(search_input.screenshot_path))
+            logger.info(f"{search_input.mhtml_path}")
         else:
-            logger.info(f"exists {save_path}")
-        self.tab.get(save_path)
+            temp_mhtml_path = self.download_s3(search_input.mhtml_path, temp_dir=TEMP_PAGE_DIR, overwrite=search_input.overwrite)
+            logger.info(f"exists {search_input.mhtml_path}, download {temp_mhtml_path}")
+            
+        self.tab.get(temp_mhtml_path)
         html_str = self.tab.html
         logger.info(f"{html_str[:150]}")
-        return
         schema = {
             "name": "Autocomplete Suggestions",
             "baseSelector": '//div[@id="sac-autocomplete-results-container"]',
@@ -67,13 +86,11 @@ class CrawlerAmzSearchKey(CrawlerBase):
                 {
                     "name": "suggestions",
                     "type": "list",
-                    "selector": ".//div[contains(@class, 's-suggestion') and contains(@class, 's-suggestion-ellipsis-direction')]",
+                    "selector": './/div[@role="button"]',
                     "fields": [
                         {
                             "name": "text",
                             "type": "text",
-                            "selector": "./text() | .//span/text()",  # 显式提取所有文本节点
-                            "transform": ["join"],  # 将多个文本节点合并为一个字符串
                         }
                     ]
                 }
@@ -81,7 +98,23 @@ class CrawlerAmzSearchKey(CrawlerBase):
         }
         result = await self.excra_strategy_raw_html(html_str, schema=schema)
         data = json.loads(result.extracted_content)
-        logger.info(f"{result.extracted_content}")
+        logger.debug(f"{result.extracted_content}")
+        search_key_result = SearchKeyResult(search_key=search_input.search_key, mhtml_path=search_input.mhtml_path, screenshot=str(search_input.screenshot_path))
+        suggestions = []
+        if len(data) == 0:
+            msg = f"{search_input.search_key} has no suggestions, temp_mhtml_path {temp_mhtml_path}"
+            logger.error(msg)
+            search_key_result.msg = msg
+            search_key_result.error = 1
+            return search_key_result
+        data = data[0]
+        for item in data['suggestions']:
+            suggestions.append(item['text'])
+        search_key_result.suggestions = suggestions
+        logger.info(f"爬取成功: {search_key_result.model_dump_json(indent=4)}")
+        if temp_mhtml_path:
+            Path(temp_mhtml_path).unlink()
+        return search_key_result
     def suggestion_listen_package(self):
         # package = self.tab.listen.wait(1)
         search_suggestion_package_list:List[DataPacket] = []
@@ -100,10 +133,16 @@ class CrawlerAmzSearchKey(CrawlerBase):
         # logger.info(f"{search_suggestion_package.response.body}")
 async def main():
     crawler = CrawlerAmzSearchKey(ChromeOptions())
-    search_key = 'パソコン'
+    search_key = 'コードカバー'
     # search_key = '1'
-    save_path = f"{AMZ_HTML_DIR}/{search_key}.mhtml"
-    crawler.tab.get('https://www.odoo.com/documentation/18.0/administration/upgrade.html')
+    # save_path = f"{AMZ_HTML_DIR}/{search_key}.mhtml"
+    save_path = crawler.s3_prefix + f"{search_key}.mhtml"
+    search_input = CrawlerSearchKeyInput(
+        search_key=search_key, 
+        mhtml_path=save_path, 
+        screenshot_path=crawler.s3_prefix + f"{search_key}.png",)
+    await crawler.crawl_suggestion(search_input)
+    # crawler.tab.get('https://www.odoo.com/documentation/18.0/administration/upgrade.html')
     # res = crawler.save_current_page(crawler.tab, f"{AMZ_HTML_DIR}/test.mhtml")
     # res = crawler.save_current_page(crawler.tab, f"s3://public/amazone/copywriting_production/output/test.mhtml")
     # logger.info(f"{res}")

+ 15 - 5
src/browser/crawl_base.py

@@ -8,7 +8,7 @@ import asyncio
 import signal
 import asyncio
 import pickle
-from pathlib import Path
+from pathlib import Path,PurePosixPath
 import random
 from typing import List
 import httpx
@@ -42,7 +42,7 @@ class CrawlerBase():
             self.get(init_url)
         return self.page.latest_tab
 
-    def save_current_page(self, tab=None, save_path:str=None, overwrite:bool=False):
+    def save_current_page(self, tab=None, save_path:str=None, overwrite:bool=False, after_unlink:bool=True):
         if not overwrite and check_exists(save_path):
             logger.info(f"exists {save_path} ")
             return save_path
@@ -54,9 +54,19 @@ class CrawlerBase():
         logger.info(f"{type(mhtml)}")
         if str(save_path).startswith('s3://'):
             upload_file_to_s3(temp_mhtml_path, str(save_path))
-            Path(temp_mhtml_path).unlink()
-            return save_path
-        return temp_mhtml_path
+            if after_unlink:
+                Path(temp_mhtml_path).unlink()
+                temp_mhtml_path = None
+        return save_path,temp_mhtml_path
+
+    def download_s3(self, s3_uri:str, temp_dir:str, overwrite:str=False):
+        if not s3_uri.startswith('s3://'):
+            raise ValueError("Invalid S3 URI. Must start with 's3://'")
+        temp_file  = Path(temp_dir) / PurePosixPath(s3_uri).name
+        if not overwrite == True and check_exists(str(temp_file)):
+            return temp_file
+        content = read_file(s3_uri)
+        return save_to_file(content, temp_file)
 
     def run_browser(self):
         page = load_chrome_from_ini(

+ 1 - 1
src/tasks/crawl_asin_save_task.py

@@ -19,7 +19,7 @@ def get_asin_and_save_page(self, asin: str, asin_area: str = 'JP',
         # 初始化浏览器配置
         chrome_options = ChromeOptions(ini_path=CFG.chrome_config_ini)
         crawler = Crawler(chrome_options=chrome_options)
-        save_path = f"{CFG.s3_prefix}/output/{asin}/{asin}.mhtml"  # 保持.mhtml后缀与内容类型一致            
+        save_path = f"{CFG.s3_prefix}/output/asinseed/{asin}/{asin}.mhtml"  # 保持.mhtml后缀与内容类型一致            
         # 执行保存操作
         final_path = crawler.get_asin_and_save_page(
             asin=asin,

+ 12 - 9
utils/file.py

@@ -4,10 +4,9 @@ import mimetypes
 import urllib.parse
 import smart_open
 from smart_open import open
-from botocore.exceptions import NoCredentialsError
+from botocore.exceptions import NoCredentialsError,ClientError
 import boto3
 import logging
-from botocore.exceptions import ClientError
 
 from botocore.config import Config
 from config.settings import CFG
@@ -27,7 +26,8 @@ def s3_uri_to_http_url(s3_uri):
     适用于公共可读的存储桶。
     """
     if not s3_uri.startswith('s3://'):
-        raise ValueError("Invalid S3 URI. Must start with 's3://'")
+        # raise ValueError("Invalid S3 URI. Must start with 's3://'")
+        return s3_uri
     
     # 提取 bucket 和 key
     path = s3_uri[5:]  # 去除 's3://'
@@ -131,13 +131,16 @@ def read_file(file_uri:str):
         return f.read()
 
 def check_exists(file_uri:str):
+    if not file_uri.startswith('s3://'):
+        return Path(file_uri).exists()
+    bucket_name, object_name, upload_args = get_s3_uri_info(file_uri)
     try:
-        with open(file_uri, 'r', transport_params={'client': s3}) as f:
-            # 文件存在,继续操作
-            return file_uri
-    except (FileNotFoundError,OSError):
-        # 文件不存在,执行相应的操作
-        return False
+        s3.head_object(Bucket=bucket_name, Key=object_name)
+        return file_uri
+    except (FileNotFoundError,OSError,ClientError) as e:
+        if e.response['Error']['Code'] == '404':
+            return False
+        raise e
 
 def main():
     response = s3.list_buckets()