4 次代碼提交 aa2ff97f48 ... 410d033091

作者 SHA1 備註 提交日期
  mrh 410d033091 成功完成搜索竞品表格的生成 1 年之前
  mrh c4e7b7bc34 将图片改到第一行,新增超链接样式 1 年之前
  mrh ffefa3bf7b 完成批量写入 excel 1 年之前
  mrh a051c0ed6e 完成批量任务调取并存入数据库;新增表格写入 1 年之前

+ 2 - 1
.gitignore

@@ -9,4 +9,5 @@ wheels/
 # Virtual environments
 .venv
 output/
-.env
+.env
+.aider*

+ 5 - 0
.vscode/settings.json

@@ -0,0 +1,5 @@
+{
+    "terminal.integrated.env.windows": {
+    "pythonpath": "${workspaceFolder}/"
+}
+}

二進制
conditional_formatting_example.xlsx


+ 21 - 0
docs/gpt/excel_writer_usage.py

@@ -0,0 +1,21 @@
+from src.excel_tools.excel_writer import ExcelWriter
+from src.models.asin_model import TrafficKeywordModel, ProductInfoModel
+import json
+input_path = r'G:\code\amazone\copywriting_production\output\page\debug\B0B658JC22_extract.json'
+
+# 示例数据加载
+with open(input_path, encoding='utf-8') as f:
+    data = json.load(f)
+
+# 调用示例
+success = ExcelWriter.process_json_to_excel(
+    data=data,            # 输入数据
+    asin="B0B658JC22",    # 产品ASIN编号
+    template_path=r"G:\code\amazone\copywriting_production\output\3月新品-文案制作-temp.xlsx",  # Excel模板路径
+    output_path=r"G:\code\amazone\copywriting_production\output\3月新品-文案制作-temp.xlsx"     # 输出文件路径
+)
+
+if success:
+    print("Excel文件生成成功!")
+else:
+    print("文件生成失败,请检查日志")

+ 136 - 0
docs/gpt/to_excel.md

@@ -0,0 +1,136 @@
+# 生成 excel 
+```json
+{
+    "result_table": [
+        {
+            "traffic_keyword": "コードカバー",
+            "keyword_link": "https://www.asinseed.com/en/JP?q=%E3%82%B3%E3%83%BC%E3%83%89%E3%82%AB%E3%83%90%E3%83%BC",
+            "monthly_searches": "9,332",
+            "amazon_search_link": "https://www.amazon.co.jp/s?k=%E3%82%B3%E3%83%BC%E3%83%89%E3%82%AB%E3%83%90%E3%83%BC"
+        },
+        {
+            "traffic_keyword": "コードカバー ペット",
+            "keyword_link": "https://www.asinseed.com/en/JP?q=%E3%82%B3%E3%83%BC%E3%83%89%E3%82%AB%E3%83%90%E3%83%BC%20%E3%83%9A%E3%83%83%E3%83%88",
+            "monthly_searches": "3,195",
+            "amazon_search_link": "https://www.amazon.co.jp/s?k=%E3%82%B3%E3%83%BC%E3%83%89%E3%82%AB%E3%83%90%E3%83%BC%20%E3%83%9A%E3%83%83%E3%83%88"
+        },
+        // ...
+        {
+            "monthly_searches": ""
+        }
+    ],
+    "product_info": {
+        "image_url": "https://m.media-amazon.com/images/I/41Q7bZ1H94L._AC_US200_.jpg",
+        "goto_amazon": "https://www.amazon.co.jp/dp/B0B658JC22",
+        "main_text": "MEL Chemistry大径 肉厚 ペットコード ペット 犬 猫 キャット ドッグ 噛みつき 防止 感電 保護 家電 チャージ コード 配線 プロテクター カバー 螺旋 スパイラル チューブ ラップ 被覆 破れ 防止 破損防止 補強 収納 収束 結束 まとめる TPU 約93cm (ブラック 黒)B0B658JC22",
+        "imgbase64": "/9j/4AAQSkZJRgABAQAAAQABAAD/..."
+    },
+    "unique_words": [
+        {
+            "word": "コードカバー"
+        },
+        {
+            "word": "猫"
+        },
+        {
+            "word": "ケーブルカバー"
+        },
+        // ...
+    ]
+}
+```
+
+我想根据上述 json 文件内容写入到一个表格,用 pandas 模块写入。
+一个 json 文件对应 3 列表格。
+- 每列的第一行是图片,不是标题。
+- 第二行才是标题,标题是来自传参 asin ,例如 "B0CQ1SHD8V" 。每行的内容是来自 result_table 的 "traffic_keyword" 字段。如果可以的话,将每行的内容用超链接链接到 result_table 的 "amazon_search_link" 字段。
+- 第二行标题的第二列:标题是搜索量 ,每行的内容是来自 result_table 的 "monthly_searches" 字段。
+- monthly_searches 在写入前需要转换为数字,排序从大到小,,并且超过 1万的数值需要标红。
+- result_table 插入完后,末尾隔出一行,开始新增 product_info 和 unique_words 。
+- 第三列空出来,方便下一个文件隔开
+
+目前生成的excel存在问题:
+- 每列的 “搜索量” 超过1w没有标红
+
+
+# 生成 excel 
+```json
+{
+    "result_table": [
+        {
+            "traffic_keyword": "コードカバー",
+            "keyword_link": "https://www.asinseed.com/en/JP?q=%E3%82%B3%E3%83%BC%E3%83%89%E3%82%AB%E3%83%90%E3%83%BC",
+            "monthly_searches": "9,332",
+            "amazon_search_link": "https://www.amazon.co.jp/s?k=%E3%82%B3%E3%83%BC%E3%83%89%E3%82%AB%E3%83%90%E3%83%BC"
+        },
+        {
+            "traffic_keyword": "コードカバー ペット",
+            "keyword_link": "https://www.asinseed.com/en/JP?q=%E3%82%B3%E3%83%BC%E3%83%89%E3%82%AB%E3%83%90%E3%83%BC%20%E3%83%9A%E3%83%83%E3%83%88",
+            "monthly_searches": "3,195",
+            "amazon_search_link": "https://www.amazon.co.jp/s?k=%E3%82%B3%E3%83%BC%E3%83%89%E3%82%AB%E3%83%90%E3%83%BC%20%E3%83%9A%E3%83%83%E3%83%88"
+        },
+        // ...
+        {
+            "monthly_searches": ""
+        }
+    ],
+    "product_info": {
+        "image_url": "https://m.media-amazon.com/images/I/41Q7bZ1H94L._AC_US200_.jpg",
+        "goto_amazon": "https://www.amazon.co.jp/dp/B0B658JC22",
+        "main_text": "MEL Chemistry大径 肉厚 ペットコード ペット 犬 猫 キャット ドッグ 噛みつき 防止 感電 保護 家電 チャージ コード 配線 プロテクター カバー 螺旋 スパイラル チューブ ラップ 被覆 破れ 防止 破損防止 補強 収納 収束 結束 まとめる TPU 約93cm (ブラック 黒)B0B658JC22",
+        "imgbase64": "/9j/4AAQSkZJRgABAQAAAQABAAD/..."
+    },
+    "unique_words": [
+        {
+            "word": "コードカバー"
+        },
+        {
+            "word": "猫"
+        },
+        {
+            "word": "ケーブルカバー"
+        },
+        // ...
+    ]
+}
+```
+
+我想根据上述 json 文件内容写入到一个表格,用 pandas 模块写入。
+一个文件对应 3 列表格。
+第一列:标题是来自传参 asin ,例如 "B0CQ1SHD8V" 。每行的内容是来自 result_table 的 "traffic_keyword" 字段。如果可以的话,将每行的内容用超链接链接到 result_table 的 "amazon_search_link" 字段。
+第二列:标题是搜索量 ,每行的内容是来自 result_table 的 "monthly_searches" 字段。
+
+因为 result_table 是一个表格总和,他们都有相同的行数,他们可能要筛选或者排列,我希望能够按搜索量进行排序从大到小,由于各个列的情况都不同,你来决定是用 python 原生代码排列再写入,还是写入到 excel 中在用excel接口进行排序。而且超过 1万的数值需要标红。
+
+但是 product_info 和 unique_words 是一个单独的字段,如果嵌入到同一个表格中,会影响 table 的排序,你觉得如何构造 excel 表格,放在最后一行可以吗?比如在 result_table 生成的 excel 结果下方新增 product_info 和 unique_words 的行。
+
+并且 Base64 图片插入到产品信息行所在的位置附近。
+
+文件中只获取了单个 json 文件,未来需要读取多个 json 文件,写入到表格中,用空列隔开。例如上述案例中,存在2列,然后隔开一列,从第三列新增新的 json 文件,新增各自的 result_table 、 product_info 、 unique_words,因此需要保持代码的兼容性,可扩展性。
+
+目前生成的excel存在问题:
+- 重构不完整
+- 我将 json 文件改成数据模型 DataProcessor DataUtils BaseExcelComponent 还有存在的必要吗?删去,并且不需要校验功能。
+- 不要硬编码。例如 img_data.startswith('/9j/') 这种写法大错特错。
+
+
+
+请重构代码。
+必须符合最佳编码规范,高内聚,低耦合,用类来管理模块。
+
+
+
+
+
+
+
+我想根据上述文件内容写入到数据库,现在 submit_extract_task_and_wait 函数已经将 json 文件上传到 S3 并且保存到数据库路径,你只需从数据库中读取,获取 s3 的文件路径获取 json 内容。然后在一个新表中中写入。
+新表的字段如下:
+asin: str
+traffic_keyword: str
+monthly_searches: str
+keyword_link: str   
+amazon_search_link: str
+tail_keyword: dict    
+
+

+ 270 - 0
src/excel_tools/excel_writer.py

@@ -0,0 +1,270 @@
+import json
+import pandas as pd
+from openpyxl import Workbook
+from openpyxl.drawing.image import Image
+from openpyxl.formatting.rule import CellIsRule
+from openpyxl.styles import PatternFill, Font, Alignment
+from openpyxl.utils import get_column_letter
+from io import BytesIO
+import base64
+from pathlib import Path
+from typing import Dict, List, Tuple
+from utils.file import read_file
+from utils.logu import logger
+
+# 样式常量
+RED_FILL = PatternFill(start_color='FF0000',end_color="FF0000", fill_type='solid')  # 修正为RGB格式
+HEADER_FONT = Font(bold=True, color='FFFFFF')
+HEADER_FILL = PatternFill(start_color='4F81BD', patternType='solid')
+COLUMN_SPACING = 3  # 每个产品占3列(关键词、搜索量、空列)
+# 确保 HEADER_FILL 使用正确的参数
+HEADER_FILL = PatternFill(start_color='4F81BD', fill_type='solid')
+
+class ProductDataProcessor:
+    """JSON数据处理中心"""
+    def __init__(self, json_data: Dict, asin: str):
+        self.json_data = json_data
+        self.asin = asin
+        self._validate_data()
+        
+    def _validate_data(self):
+        """数据校验"""
+        if 'result_table' not in self.json_data:
+            raise ValueError("Missing required 'result_table' in JSON data")
+
+    def get_sorted_dataframe(self) -> pd.DataFrame:
+        """获取排序后的DataFrame"""
+        df = pd.DataFrame(self.json_data['result_table'])
+        
+        # 数据清洗和类型转换
+        df['monthly_searches'] = df['monthly_searches'].apply(
+            lambda x: int(str(x).replace(',', '')) if x else 0
+        )
+        
+        # 过滤无效数据并排序
+        df = df[df['traffic_keyword'].notna()].sort_values(
+            by='monthly_searches', 
+            ascending=False
+        )
+        return df.reset_index(drop=True)
+
+    @property
+    def product_info(self) -> Dict:
+        """获取产品信息"""
+        return self.json_data.get('product_info', {})
+
+    @property
+    def unique_words(self) -> List[str]:
+        """获取唯一词列表"""
+        return [
+            str(word['word']).strip() 
+            for word in self.json_data.get('unique_words', [])
+            if 'word' in word
+        ]
+
+class ExcelGenerator:
+    """Excel文件生成器"""
+    def __init__(self, output_path: str):
+        self.wb = Workbook()
+        self.ws = self.wb.active
+        self.output_path = Path(output_path)
+        self.current_col = 1
+        self.max_data_rows = 0  # 记录最大数据行数
+        self.product_cols = []  # 记录所有产品起始列
+        
+    def add_product(self, json_path: str, asin: str):
+        """添加产品数据"""
+        try:
+            # 加载并处理数据
+            data = json.loads(read_file(json_path))
+            processor = ProductDataProcessor(data, asin)
+            
+            # 记录产品起始列
+            self.product_cols.append(self.current_col)
+            
+            # 写入主数据表
+            self._write_main_table(processor, asin)
+            
+            # 写入附加信息
+            self._write_additional_info(processor)
+            
+            # 插入产品图片
+            self._insert_product_image(processor.product_info)
+            
+            # 移动到下一组列
+            self.current_col += COLUMN_SPACING
+            
+        except (json.JSONDecodeError, ValueError) as e:
+            logger.error(f'Error processing {json_path}: {e}')
+
+    
+    def _write_main_table(self, processor: ProductDataProcessor, asin: str):
+        """写入主表格数据"""
+        df = processor.get_sorted_dataframe()
+        
+        # 写入表头
+        # 标题行下移到第3行(图片占1-2行)
+        # 标题行调整到第2行
+        asin_cell = self.ws.cell(2, self.current_col, asin)
+        asin_cell.font = Font(bold=True, color='0000FF', underline='single')  # 添加蓝色下划线
+        asin_cell.fill = HEADER_FILL
+        asin_cell.alignment = Alignment(horizontal='center', vertical='center')
+        
+        search_volume_cell = self.ws.cell(2, self.current_col + 1, "搜索量")
+        search_volume_cell.font = HEADER_FONT
+        search_volume_cell.fill = HEADER_FILL
+        search_volume_cell.alignment = Alignment(horizontal='center', vertical='center')
+        
+        # 使用pandas写入数据
+        # 数据从第3行开始(标题行下方直接开始数据)
+        for idx, row in df.iterrows():
+            data_row = idx + 3
+            
+            # 关键词(带超链接)
+            kw_cell = self.ws.cell(data_row, self.current_col, row['traffic_keyword'])
+            if pd.notna(row.get('amazon_search_link')):
+                kw_cell.hyperlink = row['amazon_search_link']
+                kw_cell.font = Font(color='0000FF', underline='single')  # 添加蓝色下划线样式
+            
+            # 搜索量
+            search_cell = self.ws.cell(data_row, self.current_col + 1, int(row['monthly_searches']))
+            search_cell.number_format = 'General'
+            search_cell.value = int(search_cell.value)  # 确保存储为整数类型
+        
+        # 更新最大行数
+        self.max_data_rows = max(self.max_data_rows, len(df) + 2)  # 修正最大行号计算
+        
+        # 设置初始列宽
+        self.ws.column_dimensions[get_column_letter(self.current_col)].width = 35
+        self.ws.column_dimensions[get_column_letter(self.current_col + 1)].width = 15
+
+    def _write_additional_info(self, processor: ProductDataProcessor):
+        """写入附加信息"""
+        start_row = self.max_data_rows + 3  # 间隔3行
+        
+        # 产品信息
+        self.ws.cell(start_row, self.current_col, "产品信息").font = Font(bold=True)
+        # 从product_info提取实际存在的字段
+        info_text = processor.product_info.get('main_text', '')
+        if processor.product_info.get('goto_amazon'):
+            info_text += f"\n产品链接: {processor.product_info['goto_amazon']}"
+        info_cell = self.ws.cell(start_row+1, self.current_col, info_text)
+        info_cell.alignment = Alignment(wrap_text=True, vertical='top')
+        self.ws.column_dimensions[get_column_letter(self.current_col)].width = 35
+        
+        # 唯一词
+        self.ws.cell(start_row+4, self.current_col, "唯一词").font = Font(bold=True)
+        for idx, word in enumerate(processor.unique_words, start=1):
+            self.ws.cell(start_row+4+idx, self.current_col, word)
+
+    def _insert_product_image(self, product_info: Dict):
+        """插入产品图片"""
+        img_base64 = product_info.get('imgbase64')
+        if not img_base64:
+            return
+            
+        try:
+            img_data = base64.b64decode(img_base64)
+            img = Image(BytesIO(img_data))
+            
+            # 图片位置:附加信息上方
+            # 图片插入到第1行(标题之前)
+            img_row = 1
+            img.anchor = f'{get_column_letter(self.current_col)}{img_row}'
+            self.ws.add_image(img)
+            
+            # 调整行高并预留空间
+            self.ws.row_dimensions[img_row].height = 150
+            # 更新最大数据行数(数据从第5行开始)
+            self.max_data_rows = max(self.max_data_rows, 5)
+        except Exception as e:
+            logger.error(f'图片插入失败: {e}')
+
+    def apply_formatting(self):
+        """应用最终格式"""
+        self._apply_conditional_formatting()
+        # self._adjust_column_widths()
+        self._set_global_alignment()
+
+    def _apply_conditional_formatting(self):
+        """应用条件格式"""
+        # 修正颜色定义(使用RGB格式)
+        
+        # 创建条件格式规则(移除字体设置)
+        red_rule = CellIsRule(
+            operator='greaterThan',
+            formula=['10000'],
+            stopIfTrue=True,
+            fill=RED_FILL
+        )
+        
+        # 计算目标列字母(B=2, E=5, H=8...)
+        target_columns = []
+        # 使用记录的product_cols计算目标列
+        for start_col in self.product_cols:
+            search_col = start_col + 1  # 搜索量列是起始列+1
+            target_columns.append(get_column_letter(search_col))
+        
+        # 应用条件格式到所有目标列
+        for col_letter in target_columns:
+            cell_range = f"{col_letter}3:{col_letter}{self.max_data_rows}"
+            self.ws.conditional_formatting.add(cell_range, red_rule)
+            logger.info(f"应用条件格式到 {cell_range} (值 > 10000)")
+
+    def _adjust_column_widths(self):
+        """自动调整列宽"""
+        for col in range(1, self.current_col):
+            max_length = 0
+            col_letter = get_column_letter(col)
+            
+            for cell in self.ws[col_letter]:
+                try:
+                    value_length = len(str(cell.value))
+                    if value_length > max_length:
+                        max_length = value_length
+                except:
+                    pass
+                
+            adjusted_width = (max_length + 2) * 1.2
+            self.ws.column_dimensions[col_letter].width = adjusted_width
+
+    def _set_global_alignment(self):
+        """设置全局对齐"""
+        for row in self.ws.iter_rows():
+            for cell in row:
+                cell.alignment = Alignment(
+                    horizontal='left' if cell.column % COLUMN_SPACING == 1 else 'center',
+                    vertical='center',
+                    wrap_text=True
+                )
+
+    def save(self):
+        """保存文件"""
+        try:
+            self.output_path.parent.mkdir(parents=True, exist_ok=True)
+            self.wb.save(self.output_path)
+            logger.success(f'文件保存成功: {self.output_path}')
+            return True
+        except Exception as e:
+            logger.error(f'文件保存失败: {e}')
+            return False
+        finally:
+            self.wb.close()
+
+# 使用示例
+if __name__ == "__main__":
+    json_files = [
+        (r"s3://public/amazone/copywriting_production/output/B0B658JC22/B0B658JC22_extract.json", "B0B658JC22"),
+        (r"s3://public/amazone/copywriting_production/output/B0CQ1SHD8V/B0CQ1SHD8V_extract.json", "B0CQ1SHD8V"),
+        (r"s3://public/amazone/copywriting_production/output/B0DQ84H883/B0DQ84H883_extract.json", "B0DQ84H883"),
+        (r"s3://public/amazone/copywriting_production/output/B0D44RT8R8/B0D44RT8R8_extract.json", "B0D44RT8R8"),
+    ]
+    output_path = r"G:\code\amazone\copywriting_production\output\multi_data.xlsx"
+    
+    generator = ExcelGenerator(output_path)
+    
+    for json_path, asin in json_files:
+        generator.add_product(json_path, asin)
+    
+    generator.apply_formatting()
+    generator.save()

+ 3 - 26
src/manager/core/db.py

@@ -3,15 +3,7 @@ from typing import Optional
 from sqlmodel import SQLModel, create_engine, Session, select, Field
 from config.settings import DB_URL
 from utils.sql_engine import create_db_and_tables,drop_table,engine
-
-class AsinSeed(SQLModel, table=True):
-    id: Optional[int] = Field(default=None, primary_key=True)
-    asin: str
-    asin_area: str = 'JP'
-    extra_result_path: Optional[str] = None
-    mhtml_path: Optional[str] = None
-    error: Optional[str] = None
-    created_at: Optional[datetime] = Field(default_factory=datetime.now)
+from src.models.asin_model import AsinSeed
 
 class DbManager:
     def __init__(self, engine: str=None):
@@ -25,7 +17,7 @@ class DbManager:
             session.refresh(asin_model)
             return asin_model
 
-    def get_asin_seed(self, asin: str):
+    def get_asin_seed(self, asin: str)->AsinSeed:
         with Session(self.engine) as session:
             statement = select(AsinSeed).where(AsinSeed.asin == asin)
             results = session.exec(statement)
@@ -37,22 +29,7 @@ class DbManager:
             return exist
         else:
             return self.save_asin_seed(asin_model)
-    
-    def update_asin_seed(self, asin_model: AsinSeed):
-        with Session(self.engine) as session:
-            statement = select(AsinSeed).where(AsinSeed.asin == asin_model.asin)
-            results = session.exec(statement)
-            exist = results.first()
-            if exist:
-                for key, value in asin_model.model_dump().items():
-                    setattr(exist, key, value)
-                session.add(exist)
-                session.commit()
-                session.refresh(exist)
-                return exist
-            else:
-                return None
-            
+               
 def main():
     asinseed_list = ['B0CQ1SHD8V', 'B0B658JC22', 'B0DQ84H883', 'B0D44RT8R8']
     db_manager = DbManager()

+ 24 - 27
src/manager/manager_task.py

@@ -5,6 +5,8 @@ from utils.file import save_to_file, read_file
 from src.tasks.crawl_asin_save_task import get_asin_and_save_page
 from src.tasks.crawl_asin_exract_task import extra_result
 from celery.result import AsyncResult
+from utils.logu import get_logger
+logger = get_logger('main')
 
 class ManagerTask:
     s3_prefix = CFG.s3_prefix + '/output/page'
@@ -12,6 +14,10 @@ class ManagerTask:
         self.db = DbManager()
 
     def submit_task_and_wait(self, asin: str, asin_area: str = 'JP',overwrite:bool=False, timeout: int = 300):
+        model = self.db.get_asin_seed(asin)
+        if model and model.mhtml_path:
+            logger.info(f"{asin}已经爬取过,跳过")
+            return model
         """提交任务并等待完成,保存结果路径到数据库"""
         # 提交celery任务
         task = get_asin_and_save_page.delay(asin, asin_area, overwrite)
@@ -23,17 +29,21 @@ class ManagerTask:
         # 处理任务结果
         if result.successful():
             task_result = result.result
-            self.save_task_asin_crawl_result(asin, asin_area, task_result)
+            model.mhtml_path = task_result['path']
+            self.db.save_asin_seed(model)
         return None
     
     def submit_extract_task_and_wait(self, asin: str, asin_area: str = 'JP', timeout: int = 300):
         """提交页面解析任务并等待完成,保存结果到数据库"""
         # 从数据库获取mhtml路径
         asin_seed = self.db.get_asin_seed(asin)
+        if asin_seed and asin_seed.extra_result_path:
+            logger.info(f"{asin}已经解析过,跳过")
+            return asin_seed
         if not asin_seed or not asin_seed.mhtml_path:
             print(f"未找到{asin}的mhtml路径")
             return None
-        
+        logger.info(f"{asin}页面解析开始: {asin_seed.mhtml_path}")
         # 提交celery任务
         task = extra_result.delay(asin_seed.mhtml_path)
         
@@ -45,17 +55,17 @@ class ManagerTask:
         if result.successful():
             task_result = result.result
             if task_result['status'] == 'success':
+                task_result_data = task_result['data']
                 # 保存提取结果到文件并上传S3
-                filename = f"{asin}_extract.json"
-                save_path = self.upload_file(
-                    file_path=task_result['data'],
-                    filename=filename
-                )
+                s3_dir = asin_seed.mhtml_path.rsplit('/', 1)[0]
+                save_json_uri = f"{s3_dir}/{asin}_extract.json"
+                save_to_file(task_result_data, save_json_uri)
+                task_result['path'] = save_json_uri
                 # 保存数据库记录
-                self.save_task_asin_page_extract_result(asin, asin_area, {
-                    'status': 'success',
-                    'path': save_path
-                })
+                asin_model = self.db.get_asin_seed(asin=asin)
+                asin_model.extra_result_path = save_json_uri
+                self.db.save_asin_seed(asin_model)
+                logger.info(f"{asin}页面解析成功: {task_result}")
         return task_result
     
     def save_task_asin_crawl_result(self, asin: str, asin_area:str=None, task_result: dict={}):
@@ -68,20 +78,6 @@ class ManagerTask:
             else:
                 self.db.add_or_ignore_asin_seed(AsinSeed(asin=asin, asin_area=asin_area, mhtml_path=task_result['path']))
             return asin_seed
-    def save_task_asin_page_extract_result(self, asin: str, asin_area:str=None, task_result: dict={}):
-        if task_result.get('status') == 'success':
-            asin_seed = self.db.get_asin_seed(asin)
-            if asin_seed:
-                asin_seed.extract_path = task_result['path']
-                self.db.update_asin_seed(asin_seed)
-            else:
-                new_seed = AsinSeed(
-                    asin=asin,
-                    asin_area=asin_area,
-                    extract_path=task_result['path']
-                )
-                self.db.add_or_ignore_asin_seed(new_seed)
-            return asin_seed
     def upload_file(self, file_path: str, filename: str):
         res = save_to_file(Path(file_path).read_text(), self.s3_prefix + '/' + filename)
         return res
@@ -93,8 +89,9 @@ class ManagerTask:
 def main():
     asinseed_list = ['B0CQ1SHD8V', 'B0B658JC22', 'B0DQ84H883', 'B0D44RT8R8']
     manager = ManagerTask()    
-    # manager.submit_task_and_wait('B0B658JC22', overwrite=False)
-    manager.submit_extract_task_and_wait('B0B658JC22')
+    for asin in asinseed_list:
+        manager.submit_task_and_wait(asin)
+        manager.submit_extract_task_and_wait(asin)
     # result = {'status': 'success', 'path': 's3://public/amazone/copywriting_production/output/B0B658JC22/B0B658JC22.mhtml'}
     # manager.save_task_asin_crawl_result('B0B658JC22', 'JP', result)
 if __name__ == "__main__":

+ 46 - 0
src/models/asin_model.py

@@ -0,0 +1,46 @@
+from datetime import datetime
+from typing import Optional
+from sqlmodel import SQLModel, create_engine, Session, select, Field
+from config.settings import DB_URL
+from typing import List, Optional
+from pydantic import BaseModel
+
+class AsinSeed(SQLModel, table=True):
+    id: Optional[int] = Field(default=None, primary_key=True)
+    asin: str
+    asin_area: str = 'JP'
+    extra_result_path: Optional[str] = None
+    mhtml_path: Optional[str] = None
+    error: Optional[str] = None
+    created_at: Optional[datetime] = Field(default_factory=datetime.now)
+
+
+
+class TrafficKeywordModel(BaseModel):
+    traffic_keyword: str
+    monthly_searches: str
+    keyword_link: Optional[str] = None
+    amazon_search_link: Optional[str] = None
+
+    @property
+    def monthly_searches_int(self) -> int:
+        """Convert monthly searches to integer"""
+        try:
+            return int(str(self.monthly_searches).strip().replace(',', '')) if self.monthly_searches else 0
+        except ValueError:
+            return 0
+
+
+
+class ProductInfoModel(BaseModel):
+    image_url: Optional[str] = None
+    goto_amazon: Optional[str] = None
+    main_text: Optional[str] = None
+    imgbase64: Optional[str] = None
+    unique_words: Optional[List[str]] = None
+
+
+    @property
+    def main_text_short(self) -> str:
+        """Get first 100 characters of main text"""
+        return (self.main_text or '')[:100] + '...' if len(self.main_text or '') > 100 else self.main_text

+ 0 - 0
src/models/excel_product_model.py


+ 6 - 2
tests/mytest/t_boto3.py

@@ -1,8 +1,12 @@
 from utils.file import s3_uri_to_http_url
-
+from pathlib import Path
 def main():
     s3_uri = 's3://public/amazone/copywriting_production/output/B0B658JC22/B0B658JC22.mhtml'
-    print(s3_uri_to_http_url(s3_uri))
+    path = Path(r's3://public/amazone/copywriting_production/output/B0B658JC22/B0B658JC22.mhtml')
+    asin = 'B0B658JC22'
+    save_json_path = Path(s3_uri).parent / f"{asin}_extract.json"
+    print(save_json_path)
+    # print(s3_uri_to_http_url(s3_uri))
 
 if __name__ == "__main__":
     main()

+ 52 - 0
tests/mytest/t_openxyl.py

@@ -0,0 +1,52 @@
+from openpyxl import Workbook
+from openpyxl.styles import PatternFill
+from openpyxl.formatting.rule import CellIsRule
+from utils.logu import get_logger
+logger = get_logger('test')
+# 创建新工作簿和工作表
+wb = Workbook()
+ws = wb.active
+
+# 先填充示例数据(实际使用时替换为你的真实数据)
+data = [
+    # B   C D E   F G H   I J K   L
+    [5000, 0, 0, 8000, 0, 0, 15000, 0, 0, 20000],  # 第1行
+    [12000, 0, 0, 9500, 0, 0, 8000, 0, 0, 5000],   # 第2行
+    [9000, 0, 0, 11000, 0, 0, 13000, 0, 0, 9999]   # 第3行
+]
+
+for row in data:
+    ws.append(row)
+
+# 创建红色填充样式
+red_fill = PatternFill(
+    start_color="FF0000",  # 红色
+    end_color="FF0000",
+    fill_type="solid"
+)
+
+# 配置参数
+threshold = 10000          # 阈值
+target_columns = ['A', 'D', 'H', 'K']  # 要设置条件格式的列
+
+# 获取实际数据范围的最大行数
+max_row = ws.max_row
+
+# 为每个目标列添加条件格式
+for col in target_columns:
+    # 构造列范围(例如:B1:B3)
+    cell_range = f"{col}1:{col}{max_row}"
+    
+    # 创建条件格式规则
+    rule = CellIsRule(
+        operator='greaterThan', 
+        formula=[threshold],  # 注意参数是列表类型
+        stopIfTrue=True,      # 遇到符合条件的单元格后停止其他规则检查
+        fill=red_fill
+    )
+    
+    # 将规则应用到工作表
+    ws.conditional_formatting.add(cell_range, rule)
+    logger.info(f"应用条件格式到 {cell_range} (值 > 10000) {rule}")
+# 保存工作簿
+wb.save("conditional_formatting_example.xlsx")

+ 0 - 44
utils/config.py

@@ -1,44 +0,0 @@
-import os
-import yaml
-from pathlib import Path
-from pydantic import BaseModel, Field
-from typing import List, Dict, Union,Optional,Any
-from utils.pydantic_auto_field import AutoLoadModel
-from dotenv import load_dotenv
-load_dotenv()
-
-class Config(BaseModel):
-    storage: str = "local"
-    s3_access_key: Optional[str] = os.environ.get("S3_ACCESS_KEY", 'bh9LbfsPHRJgQ44wXIlv')
-    s3_secret_key: Optional[str] = os.environ.get("S3_SECRET_KEY", 'N744RZ60T1b4zlcWG2MROCzjEE2mPTdNQCc7Pk3M')
-    s3_endpoint: Optional[str] = os.environ.get("S3_ENDPOINT", 'http://vs1.lan:9002')
-    chrome_config_ini: Optional[str] = r'G:\code\amazone\copywriting_production\config\dp_conf\9321.ini'
-    redis_url: Optional[str] = os.environ.get("REDIS_URL", 'redis://localhost:6379/0')
-    def save(self, config_path: Path = None):
-        config_path = config_path or get_config_path()
-        with open(config_path, "w", encoding="utf-8") as file:
-            yaml.dump(self.model_dump(), file)
-        return self
-            
-def get_config_path():
-    return os.environ.get('CONFIG_PATH',CONFIG_DIR / "config.yaml") 
-
-def read_config(config_path: Path):
-    if isinstance(config_path, str):
-        config_path = Path(config_path)
-    if not config_path.exists():
-        config = Config()
-        config.save(config_path)
-        return config
-    with open(config_path, "r", encoding="utf-8") as file:
-        config_dict = yaml.safe_load(file)
-    return Config(**config_dict)
-
-CFG = read_config(get_config_path())
-
-def main():
-    print(CFG)
-    CFG.save()
-
-if __name__ == "__main__":
-    main()