浏览代码

完成批量任务调取并存入数据库;新增表格写入

mrh 8 月之前
父节点
当前提交
a051c0ed6e

+ 20 - 0
docs/gpt/excel_writer_usage.py

@@ -0,0 +1,20 @@
+from src.excel_tools.excel_writer import ExcelWriter
+import json
+input_path = r'G:\code\amazone\copywriting_production\output\page\debug\B0B658JC22_extract.json'
+
+# 示例数据加载
+with open(input_path, encoding='utf-8') as f:
+    data = json.load(f)
+
+# 调用示例
+success = ExcelWriter.process_json_to_excel(
+    data=data,            # 输入数据
+    asin="B0B658JC22",    # 产品ASIN编号
+    template_path=r"G:\code\amazone\copywriting_production\output\3月新品-文案制作-temp.xlsx",  # Excel模板路径
+    output_path=r"G:\code\amazone\copywriting_production\output\3月新品-文案制作-temp.xlsx"     # 输出文件路径
+)
+
+if success:
+    print("Excel文件生成成功!")
+else:
+    print("文件生成失败,请检查日志")

+ 75 - 0
docs/gpt/to_excel.md

@@ -0,0 +1,75 @@
+```json
+{
+    "result_table": [
+        {
+            "traffic_keyword": "コードカバー",
+            "keyword_link": "https://www.asinseed.com/en/JP?q=%E3%82%B3%E3%83%BC%E3%83%89%E3%82%AB%E3%83%90%E3%83%BC",
+            "monthly_searches": "9,332",
+            "amazon_search_link": "https://www.amazon.co.jp/s?k=%E3%82%B3%E3%83%BC%E3%83%89%E3%82%AB%E3%83%90%E3%83%BC"
+        },
+        {
+            "traffic_keyword": "コードカバー ペット",
+            "keyword_link": "https://www.asinseed.com/en/JP?q=%E3%82%B3%E3%83%BC%E3%83%89%E3%82%AB%E3%83%90%E3%83%BC%20%E3%83%9A%E3%83%83%E3%83%88",
+            "monthly_searches": "3,195",
+            "amazon_search_link": "https://www.amazon.co.jp/s?k=%E3%82%B3%E3%83%BC%E3%83%89%E3%82%AB%E3%83%90%E3%83%BC%20%E3%83%9A%E3%83%83%E3%83%88"
+        },
+        // ...
+        {
+            "monthly_searches": ""
+        }
+    ],
+    "product_info": {
+        "image_url": "https://m.media-amazon.com/images/I/41Q7bZ1H94L._AC_US200_.jpg",
+        "goto_amazon": "https://www.amazon.co.jp/dp/B0B658JC22",
+        "main_text": "MEL Chemistry大径 肉厚 ペットコード ペット 犬 猫 キャット ドッグ 噛みつき 防止 感電 保護 家電 チャージ コード 配線 プロテクター カバー 螺旋 スパイラル チューブ ラップ 被覆 破れ 防止 破損防止 補強 収納 収束 結束 まとめる TPU 約93cm (ブラック 黒)B0B658JC22",
+        "imgbase64": "/9j/4AAQSkZJRgABAQAAAQABAAD/..."
+    },
+    "unique_words": [
+        {
+            "word": "コードカバー"
+        },
+        {
+            "word": "猫"
+        },
+        {
+            "word": "ケーブルカバー"
+        },
+        // ...
+    ]
+}
+```
+
+我想根据上述 json 文件内容写入到一个表格,用 pandas 模块写入。
+一个文件对应 3 列表格。
+第一列:标题是来自传参 asin ,例如 "B0CQ1SHD8V" 。每行的内容是来自 result_table 的 "traffic_keyword" 字段。如果可以的话,将每行的内容用超链接链接到 result_table 的 "amazon_search_link" 字段。
+第二列:标题是搜索量 ,每行的内容是来自 result_table 的 "monthly_searches" 字段。
+
+并且在第一列标题附近把 Base64 图片插入到表格中
+
+因为 result_table 是一个表格总和,他们都有相同的行数,他们可能要筛选或者排列,我希望能够按搜索量进行排序从大到小,由于各个列的情况都不同,你来决定是用 python 原生代码排列再写入,还是写入到 excel 中在用excel接口进行排序。而且超过 1万的数值需要标红。
+
+但是 product_info 和 unique_words 是一个单独的字段,如果嵌入到同一个表格中,会影响 table 的排序,你觉得如何构造 excel 表格,放在最后一行可以吗?比如在 result_table 生成的 excel 结果下方新增 product_info 和 unique_words 的行。
+
+请移除掉模板文件,直接生成 excel 即可。
+文件中只获取了单个 json 文件,未来需要读取多个 json 文件,写入到表格中,用空列隔开。例如上述案例中,存在2列,然后隔开一列,从第三列新增新的 json 文件,新增各自的 result_table 、 product_info 、 unique_words,因此需要保持代码的兼容性,可扩展性。
+
+请重构代码。
+必须符合最佳编码规范,高内聚,低耦合,用类来管理模块。
+
+
+必须读取 "G:\code\amazone\copywriting_production\output\3月新品-文案制作.xlsx" 这个模板文件,将生成的内容另存为通路径下另一个文件。模板文件是只读的。
+
+
+
+
+
+我想根据上述文件内容写入到数据库,现在 submit_extract_task_and_wait 函数已经将 json 文件上传到 S3 并且保存到数据库路径,你只需从数据库中读取,获取 s3 的文件路径获取 json 内容。然后在一个新表中中写入。
+新表的字段如下:
+asin: str
+traffic_keyword: str
+monthly_searches: str
+keyword_link: str   
+amazon_search_link: str
+tail_keyword: dict    
+
+

+ 174 - 0
src/excel_tools/excel_writer.py

@@ -0,0 +1,174 @@
+from dataclasses import dataclass
+import json
+from openpyxl import Workbook
+from openpyxl.drawing.image import Image
+from openpyxl.formatting.rule import CellIsRule
+from openpyxl.styles import PatternFill, Font, Alignment
+from openpyxl.utils import get_column_letter
+from io import BytesIO
+import base64
+from typing import Dict, List, Any
+from pathlib import Path
+from utils.file import read_file
+from utils.logu import logger
+
+RED_FILL = PatternFill(start_color='FFFF0000', end_color='FFFF0000', fill_type='solid')
+HEADER_FONT = Font(bold=True, color='FFFFFF')
+HEADER_FILL = PatternFill(start_color='4F81BD', patternType='solid')
+
+@dataclass
+class DataProcessor:
+    """处理单个JSON文件的数据转换"""
+    json_data: Dict[str, Any]
+    asin: str
+    
+    def process_table_data(self) -> List[Dict[str, Any]]:
+        """处理表格主体数据"""
+        return [
+            {
+                'traffic_keyword': item.get('traffic_keyword', ''),
+                'amazon_search_link': item.get('amazon_search_link', ''),
+                'monthly_searches': item.get('monthly_searches', '0').replace(',', '')
+            }
+            for item in self.json_data.get('result_table', [])
+            if item.get('traffic_keyword')
+        ]
+    
+    @property
+    def product_info(self) -> Dict[str, str]:
+        """提取产品基础信息"""
+        return self.json_data.get('product_info', {})
+    
+    @property
+    def unique_words(self) -> List[str]:
+        """提取唯一词列表"""
+        return [word.get('word', '') for word in self.json_data.get('unique_words', [])]
+
+class ExcelWriter:
+    """Excel文件写入器"""
+    
+    def __init__(self, output_path: str):
+        self.wb = Workbook()
+        self.ws = self.wb.active
+        self.output_path = Path(output_path)
+        self.current_col = 1  # 当前写入列位置
+        
+    def add_json_data(self, json_path: str, asin: str):
+        """添加单个JSON文件数据"""
+        str_data = read_file(json_path)
+        data = json.loads(str_data)
+        processor = DataProcessor(data, asin)
+        
+        # 写入表头
+        self._write_header(processor)
+        
+        # 写入表格数据
+        self._write_table_data(processor)
+        
+        # 写入附加信息
+        self._write_additional_info(processor)
+        
+        self.current_col += 3  # 数据列+间隔列
+    
+    def _write_header(self, processor: DataProcessor):
+        """写入表头(含图片)"""
+        # ASIN标题
+        header_cell = self.ws.cell(
+            row=1,
+            column=self.current_col,
+            value=processor.asin
+        )
+        header_cell.font = HEADER_FONT
+        header_cell.fill = HEADER_FILL
+        
+        # 插入Base64图片
+        img_data = processor.product_info.get('imgbase64', '')
+        if img_data and img_data.startswith('/9j/'):
+            try:
+                img = Image(BytesIO(base64.b64decode(img_data)))
+                img.anchor = f'{get_column_letter(self.current_col)}2'
+                self.ws.add_image(img)
+            except Exception as e:
+                logger.error(f'图片插入失败: {e}')
+                
+        # 搜索量标题
+        self.ws.cell(
+            row=1,
+            column=self.current_col + 1,
+            value='搜索量'
+        ).font = HEADER_FONT
+    
+    def _write_table_data(self, processor: DataProcessor):
+        """写入表格主体数据"""
+        for row_idx, item in enumerate(processor.process_table_data(), start=3):
+            # 关键词超链接
+            self.ws.cell(
+                row=row_idx,
+                column=self.current_col,
+                value=item['traffic_keyword']
+            ).hyperlink = item['amazon_search_link']
+            
+            # 搜索量数值处理
+            search_volume = item['monthly_searches']
+            cell = self.ws.cell(
+                row=row_idx,
+                column=self.current_col + 1,
+                value=int(search_volume) if search_volume.isdigit() else 0
+            )
+            
+            # 条件格式设置(超过1万标红)
+            if cell.value > 10000:
+                cell.fill = RED_FILL
+    
+    def _write_additional_info(self, processor: DataProcessor):
+        """写入附加信息到表格下方"""
+        max_row = self.ws.max_row
+        base_row = max_row + 2
+        
+        # 产品信息
+        self.ws.cell(base_row, self.current_col, '产品信息:').font = Font(bold=True)
+        self.ws.cell(base_row + 1, self.current_col, processor.product_info.get('main_text', ''))
+        
+        # 唯一词列表
+        self.ws.cell(base_row + 3, self.current_col, '唯一词:').font = Font(bold=True)
+        for idx, word in enumerate(processor.unique_words, start=1):
+            self.ws.cell(base_row + 3 + idx, self.current_col, word)
+        
+    def apply_styles(self):
+        """应用全局样式"""
+        # 设置列宽自适应
+        for col in self.ws.columns:
+            max_length = max(
+                len(str(cell.value)) for cell in col
+                if cell.value is not None
+            )
+            self.ws.column_dimensions[get_column_letter(col[0].column)].width = max_length + 2
+            
+        # 设置标题对齐
+        for row in self.ws.iter_rows(min_row=1, max_row=1):
+            for cell in row:
+                cell.alignment = Alignment(horizontal='center')
+    
+    def save(self):
+        """保存文件"""
+        self.output_path.parent.mkdir(parents=True, exist_ok=True)
+        self.wb.save(self.output_path)
+        logger.info(f'Excel文件已保存至: {self.output_path}')
+
+def main():
+    output = r"G:\code\amazone\copywriting_production\output\multi_data.xlsx"
+    json_files = [
+        (r"G:\code\amazone\copywriting_production\output\page\debug\B0B658JC22_extract.json", "B0B658JC22"),
+        # 添加更多文件示例
+        # (r"path\to\other.json", "ASIN123")
+    ]
+    logger.info(f"{json_files}")
+    writer = ExcelWriter(output)
+    for json_path, asin in json_files:
+        writer.add_json_data(json_path, asin)
+    
+    writer.apply_styles()
+    writer.save()
+
+if __name__ == "__main__":
+    main()

+ 3 - 26
src/manager/core/db.py

@@ -3,15 +3,7 @@ from typing import Optional
 from sqlmodel import SQLModel, create_engine, Session, select, Field
 from config.settings import DB_URL
 from utils.sql_engine import create_db_and_tables,drop_table,engine
-
-class AsinSeed(SQLModel, table=True):
-    id: Optional[int] = Field(default=None, primary_key=True)
-    asin: str
-    asin_area: str = 'JP'
-    extra_result_path: Optional[str] = None
-    mhtml_path: Optional[str] = None
-    error: Optional[str] = None
-    created_at: Optional[datetime] = Field(default_factory=datetime.now)
+from src.models.asin_model import AsinSeed
 
 class DbManager:
     def __init__(self, engine: str=None):
@@ -25,7 +17,7 @@ class DbManager:
             session.refresh(asin_model)
             return asin_model
 
-    def get_asin_seed(self, asin: str):
+    def get_asin_seed(self, asin: str)->AsinSeed:
         with Session(self.engine) as session:
             statement = select(AsinSeed).where(AsinSeed.asin == asin)
             results = session.exec(statement)
@@ -37,22 +29,7 @@ class DbManager:
             return exist
         else:
             return self.save_asin_seed(asin_model)
-    
-    def update_asin_seed(self, asin_model: AsinSeed):
-        with Session(self.engine) as session:
-            statement = select(AsinSeed).where(AsinSeed.asin == asin_model.asin)
-            results = session.exec(statement)
-            exist = results.first()
-            if exist:
-                for key, value in asin_model.model_dump().items():
-                    setattr(exist, key, value)
-                session.add(exist)
-                session.commit()
-                session.refresh(exist)
-                return exist
-            else:
-                return None
-            
+               
 def main():
     asinseed_list = ['B0CQ1SHD8V', 'B0B658JC22', 'B0DQ84H883', 'B0D44RT8R8']
     db_manager = DbManager()

+ 24 - 27
src/manager/manager_task.py

@@ -5,6 +5,8 @@ from utils.file import save_to_file, read_file
 from src.tasks.crawl_asin_save_task import get_asin_and_save_page
 from src.tasks.crawl_asin_exract_task import extra_result
 from celery.result import AsyncResult
+from utils.logu import get_logger
+logger = get_logger('main')
 
 class ManagerTask:
     s3_prefix = CFG.s3_prefix + '/output/page'
@@ -12,6 +14,10 @@ class ManagerTask:
         self.db = DbManager()
 
     def submit_task_and_wait(self, asin: str, asin_area: str = 'JP',overwrite:bool=False, timeout: int = 300):
+        model = self.db.get_asin_seed(asin)
+        if model and model.mhtml_path:
+            logger.info(f"{asin}已经爬取过,跳过")
+            return model
         """提交任务并等待完成,保存结果路径到数据库"""
         # 提交celery任务
         task = get_asin_and_save_page.delay(asin, asin_area, overwrite)
@@ -23,17 +29,21 @@ class ManagerTask:
         # 处理任务结果
         if result.successful():
             task_result = result.result
-            self.save_task_asin_crawl_result(asin, asin_area, task_result)
+            model.mhtml_path = task_result['path']
+            self.db.save_asin_seed(model)
         return None
     
     def submit_extract_task_and_wait(self, asin: str, asin_area: str = 'JP', timeout: int = 300):
         """提交页面解析任务并等待完成,保存结果到数据库"""
         # 从数据库获取mhtml路径
         asin_seed = self.db.get_asin_seed(asin)
+        if asin_seed and asin_seed.extra_result_path:
+            logger.info(f"{asin}已经解析过,跳过")
+            return asin_seed
         if not asin_seed or not asin_seed.mhtml_path:
             print(f"未找到{asin}的mhtml路径")
             return None
-        
+        logger.info(f"{asin}页面解析开始: {asin_seed.mhtml_path}")
         # 提交celery任务
         task = extra_result.delay(asin_seed.mhtml_path)
         
@@ -45,17 +55,17 @@ class ManagerTask:
         if result.successful():
             task_result = result.result
             if task_result['status'] == 'success':
+                task_result_data = task_result['data']
                 # 保存提取结果到文件并上传S3
-                filename = f"{asin}_extract.json"
-                save_path = self.upload_file(
-                    file_path=task_result['data'],
-                    filename=filename
-                )
+                s3_dir = asin_seed.mhtml_path.rsplit('/', 1)[0]
+                save_json_uri = f"{s3_dir}/{asin}_extract.json"
+                save_to_file(task_result_data, save_json_uri)
+                task_result['path'] = save_json_uri
                 # 保存数据库记录
-                self.save_task_asin_page_extract_result(asin, asin_area, {
-                    'status': 'success',
-                    'path': save_path
-                })
+                asin_model = self.db.get_asin_seed(asin=asin)
+                asin_model.extra_result_path = save_json_uri
+                self.db.save_asin_seed(asin_model)
+                logger.info(f"{asin}页面解析成功: {task_result}")
         return task_result
     
     def save_task_asin_crawl_result(self, asin: str, asin_area:str=None, task_result: dict={}):
@@ -68,20 +78,6 @@ class ManagerTask:
             else:
                 self.db.add_or_ignore_asin_seed(AsinSeed(asin=asin, asin_area=asin_area, mhtml_path=task_result['path']))
             return asin_seed
-    def save_task_asin_page_extract_result(self, asin: str, asin_area:str=None, task_result: dict={}):
-        if task_result.get('status') == 'success':
-            asin_seed = self.db.get_asin_seed(asin)
-            if asin_seed:
-                asin_seed.extract_path = task_result['path']
-                self.db.update_asin_seed(asin_seed)
-            else:
-                new_seed = AsinSeed(
-                    asin=asin,
-                    asin_area=asin_area,
-                    extract_path=task_result['path']
-                )
-                self.db.add_or_ignore_asin_seed(new_seed)
-            return asin_seed
     def upload_file(self, file_path: str, filename: str):
         res = save_to_file(Path(file_path).read_text(), self.s3_prefix + '/' + filename)
         return res
@@ -93,8 +89,9 @@ class ManagerTask:
 def main():
     asinseed_list = ['B0CQ1SHD8V', 'B0B658JC22', 'B0DQ84H883', 'B0D44RT8R8']
     manager = ManagerTask()    
-    # manager.submit_task_and_wait('B0B658JC22', overwrite=False)
-    manager.submit_extract_task_and_wait('B0B658JC22')
+    for asin in asinseed_list:
+        manager.submit_task_and_wait(asin)
+        manager.submit_extract_task_and_wait(asin)
     # result = {'status': 'success', 'path': 's3://public/amazone/copywriting_production/output/B0B658JC22/B0B658JC22.mhtml'}
     # manager.save_task_asin_crawl_result('B0B658JC22', 'JP', result)
 if __name__ == "__main__":

+ 13 - 0
src/models/asin_model.py

@@ -0,0 +1,13 @@
+from datetime import datetime
+from typing import Optional
+from sqlmodel import SQLModel, create_engine, Session, select, Field
+from config.settings import DB_URL
+
+class AsinSeed(SQLModel, table=True):
+    id: Optional[int] = Field(default=None, primary_key=True)
+    asin: str
+    asin_area: str = 'JP'
+    extra_result_path: Optional[str] = None
+    mhtml_path: Optional[str] = None
+    error: Optional[str] = None
+    created_at: Optional[datetime] = Field(default_factory=datetime.now)

+ 0 - 0
src/models/excel_product_model.py


+ 6 - 2
tests/mytest/t_boto3.py

@@ -1,8 +1,12 @@
 from utils.file import s3_uri_to_http_url
-
+from pathlib import Path
 def main():
     s3_uri = 's3://public/amazone/copywriting_production/output/B0B658JC22/B0B658JC22.mhtml'
-    print(s3_uri_to_http_url(s3_uri))
+    path = Path(r's3://public/amazone/copywriting_production/output/B0B658JC22/B0B658JC22.mhtml')
+    asin = 'B0B658JC22'
+    save_json_path = Path(s3_uri).parent / f"{asin}_extract.json"
+    print(save_json_path)
+    # print(s3_uri_to_http_url(s3_uri))
 
 if __name__ == "__main__":
     main()

+ 0 - 44
utils/config.py

@@ -1,44 +0,0 @@
-import os
-import yaml
-from pathlib import Path
-from pydantic import BaseModel, Field
-from typing import List, Dict, Union,Optional,Any
-from utils.pydantic_auto_field import AutoLoadModel
-from dotenv import load_dotenv
-load_dotenv()
-
-class Config(BaseModel):
-    storage: str = "local"
-    s3_access_key: Optional[str] = os.environ.get("S3_ACCESS_KEY", 'bh9LbfsPHRJgQ44wXIlv')
-    s3_secret_key: Optional[str] = os.environ.get("S3_SECRET_KEY", 'N744RZ60T1b4zlcWG2MROCzjEE2mPTdNQCc7Pk3M')
-    s3_endpoint: Optional[str] = os.environ.get("S3_ENDPOINT", 'http://vs1.lan:9002')
-    chrome_config_ini: Optional[str] = r'G:\code\amazone\copywriting_production\config\dp_conf\9321.ini'
-    redis_url: Optional[str] = os.environ.get("REDIS_URL", 'redis://localhost:6379/0')
-    def save(self, config_path: Path = None):
-        config_path = config_path or get_config_path()
-        with open(config_path, "w", encoding="utf-8") as file:
-            yaml.dump(self.model_dump(), file)
-        return self
-            
-def get_config_path():
-    return os.environ.get('CONFIG_PATH',CONFIG_DIR / "config.yaml") 
-
-def read_config(config_path: Path):
-    if isinstance(config_path, str):
-        config_path = Path(config_path)
-    if not config_path.exists():
-        config = Config()
-        config.save(config_path)
-        return config
-    with open(config_path, "r", encoding="utf-8") as file:
-        config_dict = yaml.safe_load(file)
-    return Config(**config_dict)
-
-CFG = read_config(get_config_path())
-
-def main():
-    print(CFG)
-    CFG.save()
-
-if __name__ == "__main__":
-    main()