Explorar el Código

新增数据库备份。修复竞品爬取图片下载到S3路径错误的问题。

mrh hace 8 meses
padre
commit
04c29fb750

+ 1 - 0
.gitignore

@@ -11,3 +11,4 @@ wheels/
 output/
 .env
 .aider*
+.history

+ 26 - 1
README.md

@@ -1 +1,26 @@
-uv pip install universal_pathlib 
+uv pip install universal_pathlib 
+
+# 备份数据库
+下载: https://www.mongodb.com/try/download/database-tools
+文档 : https://www.mongodb.com/docs/database-tools/
+```shell
+cd G:\program\MongoDB\mongodb-database-tools-windows-x86_64-100.11.0\bin
+
+$timestr = Get-Date -Format "yyyyMMdd_HH_mm_ss";G:\program\MongoDB\mongodb-database-tools-windows-x86_64-100.11.0\bin\mongodump.exe --uri="mongodb://sv-v2:27017" -o I:\eng\backup\mongo\$timestr
+
+# 首先要加载 env 变量, J:\eng-sftp\nextcloud\eng\my-document\restic.md
+restic backup G:\code\amazone\copywriting_production\output\mongodb_backup
+restic snapshots -H pc
+restic ls -l 04c9313f
+```
+
+## 备份集合到另一个数据库
+```shell
+G:\program\MongoDB\mongodb-database-tools-windows-x86_64-100.11.0\bin\mongodump.exe --uri="mongodb://sv-v2:27017/amazone" --collection=Product -o G:\code\amazone\copywriting_production\output\temp
+$timestr = Get-Date -Format "yyyyMMdd_HH_mm_ss"
+echo $timestr
+G:\program\MongoDB\mongodb-database-tools-windows-x86_64-100.11.0\bin\mongorestore.exe --uri="mongodb://sv-v2:27017/" --nsInclude="amazone.Product" --nsFrom="amazone.Product" --nsTo="backup_amazone.Product123" G:\code\amazone\copywriting_production\output\temp\amazone\Product.bson --drop
+
+G:\program\MongoDB\mongodb-database-tools-windows-x86_64-100.11.0\bin\mongorestore.exe --uri="mongodb://sv-v2:27017/backup_amazone" --collection=Product G:\code\amazone\copywriting_production\output\temp
+mongoimport --uri="mongodb://sv-v2:27017" --collection=users --file=G:\code\amazone\copywriting_production\output\mongodb_backup\users.json
+```

+ 2 - 2
src/browser/crawl_asin.py

@@ -177,12 +177,12 @@ class Crawler(CrawlerBase):
         data = await self.excra_product_info(html_content)
         if data['product_info'].get('image_url'):
             img_name = UPath(data['product_info']['image_url']).name
-            upload_s3_dir = str(UPath(upload_s3_dir) / img_name)
+            img_path = str(UPath(upload_s3_dir) / img_name)
             logger.info(f"upload_s3_dir {upload_s3_dir}")
             status,save_img_path = await asyncio.to_thread(self.download_img, 
                 data['product_info']['image_url'],
                 as_img_base64=False,
-                upload_s3_dir=upload_s3_dir)
+                img_path=img_path)
         data['product_info']['img_path'] = save_img_path
         logger.info(f"{json.dumps(data, indent=4,ensure_ascii=False)}")
         return data

+ 4 - 7
src/browser/crawl_base.py

@@ -90,7 +90,7 @@ class CrawlerBase():
                 )
             )
             return result
-    def download_img(self,url:str,save_dir:str=TEMP_PAGE_DIR, page:str=None,as_img_base64:bool=True, upload_s3_dir:str=''):
+    def download_img(self,url:str,save_dir:str=TEMP_PAGE_DIR, page:str=None,as_img_base64:bool=True, img_path:str=''):
         # ('success', '{abs_current_path}\\notice.svg')
         p = page or self.page
         status,path = p.download(url, save_path=save_dir)
@@ -103,14 +103,11 @@ class CrawlerBase():
                 Path(path).unlink()
                 # dataUrl = f"data:image/svg+xml;base64,{encoded_string}"
                 return status,encoded_string
-            if upload_s3_dir:
-                if not upload_s3_dir.endswith('/'):
-                    upload_s3_dir += '/'
-                save_img_path = f"{upload_s3_dir}{path_name}"
+            if img_path:
                 with open(path, 'rb') as f:
-                    save_to_file(f.read(), save_img_path)
+                    save_to_file(f.read(), img_path)
                 Path(path).unlink()
-                return status,save_img_path
+                return status,img_path
         return status,path
     async def run(self, url:str):
         page = load_chrome_from_ini(

+ 12 - 13
src/excel_tools/file_manager.py

@@ -1,3 +1,4 @@
+import asyncio
 import json
 from pathlib import Path
 import shutil
@@ -13,7 +14,8 @@ from src.manager import DbManager,StorageManager
 from utils.logu import get_logger
 from config.settings import OUTPUT_DIR
 from src.models.asin_model import AsinExtraResultModel
-
+from src.models.product_model import Product,CompetitorAnalysis
+from src.manager.core.db_mongo import BaseMongoManager
 
 logger = get_logger('excel')
 
@@ -40,25 +42,22 @@ class ExcelFileManager:
         self.wb.save(self.output_path)
         self.wb.close()
     
-    def write_competitive_sheet(self, extract_data:dict, sheet_name: str = "竞品关键词调研", sheet_index: int = 0, overwrite: bool = False):
+    def write_competitive_sheet(self, competitor_analysis:Dict[str, CompetitorAnalysis], sheet_name: str = "竞品关键词调研", sheet_index: int = 0, overwrite: bool = False):
         if overwrite and sheet_name in self.wb.sheetnames:
             self.wb.remove(self.wb[sheet_name])
         if sheet_name not in self.wb.sheetnames:
             competitive_sheet_writer = CompetitiveAnalysisWriter(self.wb, sheet_name=sheet_name, sheet_index=sheet_index)
-            competitive_sheet_writer.add_data(extract_data)
+            competitive_sheet_writer.add_data(competitor_analysis)
     def load_s3_extract_data(self) -> list[AsinExtraResultModel]:
         return self.s3_storage_manager.load_s3_complete_extract_data()
 
-def main():
+async def main():
     self = ExcelFileManager(r"G:\code\amazone\copywriting_production\output\resource\multi_data.xlsx")
-    extract_data_lsit = self.load_s3_extract_data()
-    logger.info(f"{extract_data_lsit}")
-    dict_list = [model.model_dump() for model in extract_data_lsit]
-    # 使用 json.dumps() 将字典列表转换为 JSON 字符串
-    json_str = json.dumps(dict_list, indent=4, ensure_ascii=False)
-    save_to_file(json_str, OUTPUT_DIR / "multi_data.json")
-    logger.info(f"{len(extract_data_lsit)}")
-    # self.write_competie_sheet(extract_data)
+    db_mongo = BaseMongoManager()
+    asyncio.run(db_mongo.initialize())
+    product = await Product.find_one(Product.basic_info.name == "电线保护套")
+    extract_data_lsit = product.competitor_analysis
+    self.write_competitive_sheet(product.competitor_analysis)
     # self.save_all()
     return
     competi_sheet = CompetitiveAnalysisWriter(excel_file.output_path)
@@ -86,4 +85,4 @@ def main():
 
 
 if __name__ == "__main__":
-    main()
+    asyncio.run(main())

+ 27 - 12
src/excel_tools/writers/competitive_analysis.py

@@ -13,6 +13,8 @@ from utils.file import read_file
 from utils.logu import get_logger
 from openpyxl import load_workbook,Workbook
 from .base_writer import ExcelWriterBase
+from src.models.product_model import Product,CompetitorAnalysis
+
 
 logger = get_logger('excel')
 
@@ -82,24 +84,37 @@ class CompetitiveAnalysisWriter(ExcelWriterBase):
             self.ws = self.wb.create_sheet(self.sheet_name, index=self.sheet_index)
             logger.info(f"新建工作表: {self.sheet_name}")
 
-    def add_data(self, data: List[Dict[str, Any]]):
-        for product_data in data:
-            logger.info(f"{product_data['asin']}, 处理中...")
-            self.add_product(product_data['extra_result_data'], product_data['asin'])
+    def add_data(self, competitor_analysis: Dict[str, CompetitorAnalysis]):
+            
+        for asin, analysis in competitor_analysis.items():
+            logger.info(f"正在处理竞品 {asin}...")
+            self.add_product(analysis)
         
         self.apply_formatting()
 
-    def add_product(self, data: dict, asin: str):
-        """添加产品数据"""
+    def add_product(self, analysis: CompetitorAnalysis) -> None:
+        """添加竞品分析数据
+        Args:
+            analysis (CompetitorAnalysis): 竞品分析数据对象,必须包含extra_result和asin字段
+        """
         try:
-            # 加载并处理数据
-            processor = ProductDataProcessor(data, asin)
+            # 参数校验
+            if not analysis.extra_result:
+                raise ValueError(f"竞品分析数据缺失: {analysis.asin}")
+            if not analysis.asin:
+                raise ValueError("竞品ASIN不能为空")
+                
+            # 初始化数据处理
+            processor = ProductDataProcessor(
+                json_data=analysis.extra_result,
+                asin=analysis.asin
+            )
             
             # 记录产品起始列
             self.product_cols.append(self.current_col)
             
-            # 写入主数据表
-            self._write_main_table(processor, asin)
+            # 写入主数据表(使用竞品分析中的ASIN)
+            self._write_main_table(processor, analysis.asin)
             
             # 写入附加信息
             self._write_additional_info(processor)
@@ -110,8 +125,8 @@ class CompetitiveAnalysisWriter(ExcelWriterBase):
             # 移动到下一组列
             self.current_col += self.COLUMN_SPACING
             
-        except (json.JSONDecodeError, ValueError) as e:
-            logger.error(f'Error processing {data}: {e}')
+        except (ValueError, KeyError) as e:
+            logger.error(f'处理竞品 {analysis.asin} 数据失败: {e}')
 
     
     def _write_main_table(self, processor: ProductDataProcessor, asin: str):

+ 8 - 2
src/manager/core/db_mongo.py

@@ -1,3 +1,4 @@
+from datetime import datetime
 from typing import Optional, List
 from beanie import Document, Indexed, init_beanie
 from beanie.odm.operators.update.general import Set
@@ -14,6 +15,11 @@ class BaseMongoManager:
     async def initialize(self):
         await init_beanie(database=self.db, document_models=[Product])
 
+    async def backup(self, source_collection_name: Document, backup_collection_name:str, backup_db_name: str = f"{MONGO_DB_NAME}_backup"):
+        backup_db = self.client[backup_db_name]
+        backup_collection = backup_db[backup_collection_name]
+        await backup_collection.insert_many(await source_collection_name.find().to_list())
+
 class DbManagerMongo(BaseMongoManager):
     pass
 
@@ -31,8 +37,8 @@ async def main():
     # 初始化示例
     db_manager = DbManagerMongo()
     await db_manager.initialize()
-    product = await Product.find_one(Product.basic_info.name == "电线保护套")
-    print(product)
+    time_str = datetime.now().strftime("%Y%m%d%H%M%S")
+    await db_manager.backup(Product, f"product_{time_str}")
     # 使用示例
     # 这里可以添加新的Product操作示例
 

+ 1 - 1
src/manager/manager_task.py

@@ -105,7 +105,7 @@ class ManagerTask:
         # 收集需要处理的任务
         for asin, competitor in product.competitor_analysis.items():
             competitor: CompetitorAnalysis
-            if competitor.extra_result_path:
+            if competitor.extra_result:
                 logger.info(f"{asin}已缓存结果,跳过处理")
                 continue
             if not competitor.mhtml_path:

+ 23 - 0
src/tools/backup_mongo.py

@@ -0,0 +1,23 @@
+import asyncio
+from datetime import datetime
+from typing import Optional, List
+from beanie import Document, Indexed, init_beanie
+from beanie.odm.operators.update.general import Set
+from pydantic import BaseModel
+from motor.motor_asyncio import AsyncIOMotorClient
+from config.settings import MONGO_URL, MONGO_DB_NAME
+from src.models.product_model import Product
+from src.manager.core.db_mongo import BaseMongoManager
+from utils.logu import get_logger
+from upath import UPath
+logger = get_logger('tools')
+
+async def test_product_mongo():
+    db_mongo = BaseMongoManager()
+    await db_mongo.initialize()
+    product = await Product.find_one(Product.basic_info.name == "电线保护套")
+async def main():
+   await test_product_mongo()
+
+if __name__ == "__main__":
+    asyncio.run(main())