Ver Fonte

完成文件本地存储或上传S3

mrh há 9 meses atrás
pai
commit
0dc47ed40e
7 ficheiros alterados com 142 adições e 10 exclusões
  1. 0 1
      .env
  2. 2 1
      .gitignore
  3. 1 0
      config/.gitignore
  4. 20 7
      src/browser/crawl_asin.py
  5. 42 0
      utils/config.py
  6. 30 1
      utils/file.py
  7. 47 0
      utils/pydantic_auto_field.py

+ 0 - 1
.env

@@ -1 +0,0 @@
-DB_URL = "postgresql+psycopg2://user:password@sv-v2.lan:5435/copywriting_production"

+ 2 - 1
.gitignore

@@ -8,4 +8,5 @@ wheels/
 
 # Virtual environments
 .venv
-output/
+output/
+.env

+ 1 - 0
config/.gitignore

@@ -0,0 +1 @@
+config.yaml

+ 20 - 7
src/browser/crawl_asin.py

@@ -18,13 +18,15 @@ from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
 from utils.logu import get_logger
 from config.settings import OUTPUT_DIR
 from utils.drission_page import load_chrome_from_ini,ChromeOptions
-from utils.file import save_to_file
+from utils.file import save_to_file,check_exists
+from utils.config import CFG
+
 logger = get_logger('browser')
 ASIN_HTML_DIR = OUTPUT_DIR / 'page' / 'asin'
 ASIN_HTML_DIR.mkdir(parents=True, exist_ok=True)
 
 class Crawler():
-    def __init__(self, chrome_options:ChromeOptions):
+    def __init__(self, chrome_options:ChromeOptions, storage_config:dict=None):
         self.chrome_options = chrome_options
     
     async def run(self, url:str):
@@ -74,11 +76,15 @@ class Crawler():
             return page.save(str(ASIN_HTML_DIR), name=f'{asin}')
         else:
             return page.html
-    def get_asin_and_save_page(self, asin:str, asin_area:str='JP', mthml_type:bool=True, save_path:str=None):
-        data = self.get_asin_page_data(asin, asin_area, mthml_type)
+    def get_asin_and_save_page(self, asin:str, asin_area:str='JP', mthml_type:bool=True, save_path:str=None, overwrite:bool=False):
+        if not overwrite and check_exists(save_path):
+            logger.info(f"{save_path} exists")
+            return save_path
+        # data = self.get_asin_page_data(asin, asin_area, mthml_type)
+        data = self.get_asin_url(asin, asin_area)
         save_path = save_path or str(ASIN_HTML_DIR / f'{asin}.html')
-        save_to_file(data, save_path)
-        return save_path
+        return save_to_file(data, save_path)
+        
 async def task():
     asin = ['B0CQ1SHD8V', 'B0B658JC22', 'B0DQ84H883', 'B0D44RT8R8']
     c = Crawler(ChromeOptions())
@@ -86,7 +92,14 @@ async def task():
     logger.info(f"{str(ASIN_HTML_DIR / 'B0CQ1SHD8V.html')}")
     tab = page.latest_tab
     tab.get(r'G:\code\amazone\copywriting_production\output\page\debug\B0CQ1SHD8V.html.mhtml')
-    save_to_file(page.html, str(OUTPUT_DIR / 'page/debug' / f'{asin[0]}-from-mthml.html'))
+    # c.get_asin_and_save_page(asin[0], 'JP', save_path=str(ASIN_HTML_DIR / 'B0CQ1SHD8V.html'))
+    # logger.info(f"{CFG.s3_secret_key}")
+    c.get_asin_and_save_page(
+        asin[0], 
+        'JP',
+        save_path='s3://public/amazone/copywriting_production/output/B0CQ1SHD8V.html',
+        overwrite=True
+    )
     # page.save(str(ASIN_HTML_DIR), name=f'{asin[0]}.html')
     # save_to_file(page.html, str(ASIN_HTML_DIR / 'B0CQ1SHD8V.html'))
     # await c.run('https://fr.florame.com/en/essential-oils')

+ 42 - 0
utils/config.py

@@ -0,0 +1,42 @@
+import os
+import yaml
+from pathlib import Path
+from pydantic import BaseModel, Field
+from typing import List, Dict, Union,Optional,Any
+from utils.pydantic_auto_field import AutoLoadModel
+from config.settings import CONFIG_DIR
+from dotenv import load_dotenv
+load_dotenv()
+
+CONFIG_PATH = CONFIG_DIR / "config.yaml"
+class Config(BaseModel):
+    storage: str = "local"
+    s3_access_key: Optional[str] = os.environ.get("S3_ACCESS_KEY", 'bh9LbfsPHRJgQ44wXIlv')
+    s3_secret_key: Optional[str] = os.environ.get("S3_SECRET_KEY", 'N744RZ60T1b4zlcWG2MROCzjEE2mPTdNQCc7Pk3M')
+    s3_endpoint: Optional[str] = os.environ.get("S3_ENDPOINT", 'http://vs1.lan:9002')
+    def save(self):
+        config_path = get_config_path()
+        with open(config_path, "w", encoding="utf-8") as file:
+            yaml.dump(self.model_dump(), file)
+        return self
+            
+def get_config_path():
+    return CONFIG_PATH
+
+def read_config(config_path: Path):
+    if not config_path.exists():
+        config = Config()
+        config.save()
+        return config
+    with open(config_path, "r", encoding="utf-8") as file:
+        config_dict = yaml.safe_load(file)
+    return Config(**config_dict)
+
+CFG = read_config(get_config_path())
+
+def main():
+    print(CFG)
+    CFG.save()
+
+if __name__ == "__main__":
+    main()

+ 30 - 1
utils/file.py

@@ -1,5 +1,25 @@
 import json
 from pathlib import Path
+from smart_open import open
+from botocore.exceptions import NoCredentialsError
+import boto3
+from botocore.config import Config
+from utils.config import CFG
+
+s3 = boto3.client(
+    's3',
+    aws_access_key_id=CFG.s3_access_key,
+    aws_secret_access_key=CFG.s3_secret_key,
+    endpoint_url=CFG.s3_endpoint,
+    config=Config(signature_version='s3v4'),
+    # aws_account_id='ACCOUNT_ID'
+)
+response = s3.list_buckets()
+
+# Output the bucket names
+print('Existing buckets:')
+for bucket in response['Buckets']:
+    print(f'  {bucket["Name"]}')
 def save_to_file(content, filename:Path):
     if not isinstance(content, str):
     # 如果可以用 json 格式化,则格式化
@@ -10,6 +30,15 @@ def save_to_file(content, filename:Path):
             if not isinstance(content, str):
                 content = str(content)
     
-    with open(filename, "w", encoding="utf-8") as file:
+    with open(filename, "w", encoding="utf-8", transport_params={'client': s3}) as file:
         file.write(content)
     return filename
+
+def check_exists(file_uri:str):
+    try:
+        with open(file_uri, 'r', transport_params={'client': s3}) as f:
+            # 文件存在,继续操作
+            return file_uri
+    except (FileNotFoundError,OSError):
+        # 文件不存在,执行相应的操作
+        return False

+ 47 - 0
utils/pydantic_auto_field.py

@@ -0,0 +1,47 @@
+from typing import Type, TypeVar, Dict, Any, Union, Optional
+from pydantic import BaseModel, model_validator
+
+# 定义一个通用类型变量,用于表示任意 Pydantic 模型
+ModelType = TypeVar("ModelType", bound=BaseModel)
+
+class ModelField:
+    """
+    通用的字段类型,用于将字典自动转换为指定的 Pydantic 模型对象。
+    """
+    def __init__(self, model_class: Type[ModelType]):
+        self.model_class = model_class
+
+    def __call__(self, value: Any) -> ModelType:
+        if isinstance(value, dict):
+            return self.model_class(**value)
+        elif isinstance(value, self.model_class):
+            return value
+        else:
+            raise ValueError(f"Expected dict or {self.model_class}, got {type(value)}")
+
+class AutoLoadModel(BaseModel):
+    """
+    基类,用于自动加载嵌套的 Pydantic 模型对象。
+    """
+    @model_validator(mode='before')
+    def auto_load_nested_models(cls, values: Dict[str, Any]) -> Dict[str, Any]:
+        for field_name, field in cls.model_fields.items():
+            field_type = field.annotation
+            if hasattr(field_type, "__origin__") and field_type.__origin__ is Union:
+                # 处理 Union 类型(如 Optional)
+                field_type = next(t for t in field_type.__args__ if t is not type(None))
+            if isinstance(field_type, type) and issubclass(field_type, BaseModel):
+                # 如果字段是 Pydantic 模型类型,则递归处理
+                field_value = values.get(field_name)
+                if isinstance(field_value, dict):
+                    values[field_name] = field_type(**field_value)
+                elif isinstance(field_value, list):
+                    values[field_name] = [field_type(**item) if isinstance(item, dict) else item for item in field_value]
+            elif isinstance(field_type, dict) and hasattr(field_type, "get") and callable(field_type.get):
+                # 处理 Dict 类型,检查值是否为 Pydantic 模型
+                field_value = values.get(field_name)
+                if isinstance(field_value, dict):
+                    for key, value in field_value.items():
+                        if isinstance(value, dict):
+                            values[field_name][key] = field_type(value)
+        return values