Răsfoiți Sursa

完成浏览器批量采集 asinseed

mrh 3 luni în urmă
părinte
comite
bf236e096c

+ 19 - 0
docs/数据库.md

@@ -0,0 +1,19 @@
+## Excel产品本月视图
+
+```sql
+CREATE OR REPLACE VIEW monthly_product_imports AS
+SELECT 
+    pi.*,
+--    pi.product_name,
+    competitor,
+    EXISTS (
+        SELECT 1
+        FROM public.asinseed asinseed
+        WHERE asinseed.asin = competitor
+    ) AS asin_exists
+FROM 
+    public.productimport pi,
+    LATERAL jsonb_array_elements_text(pi.product_data::jsonb->'competitor_list') AS competitor
+WHERE 
+    date_trunc('month', pi.created_at) = date_trunc('month', CURRENT_DATE);
+```

+ 30 - 2
src/browser/browser_config.py

@@ -3,6 +3,7 @@ from typing import Optional, Union
 from pydantic import BaseModel, Field
 from enum import StrEnum
 from config.settings import OUTPUT_DIR
+from src.manager.core.db import DbManager
 
 
 class BaseCommon(BaseModel):
@@ -12,10 +13,16 @@ class BaseCommon(BaseModel):
     chrome_config_ini: Optional[str] = Field(default=None, description="Chrome配置INI文件路径")
     proxy: Optional[str] = Field(default=None, description="浏览器要使用的代理")
     active: Optional[bool] = Field(default=True, description="是否激活浏览器")
+    host_name: Optional[str] = Field(default='pc', description="浏览器主机名")
+
 
 class AccountInBrowser(BaseCommon):
     account: Optional[str] = Field(default=None, description="浏览器要登录的Google账号")
     password: Optional[str] = Field(default=None, description="浏览器要登录的Google密码")
+    max_access_limit: int = Field(default=10, description="账号最大访问限制次数")
+    # 剩余访问次数
+    remaining_access_limit: int = Field(default=10, description="账号剩余访问次数")
+
 def create_browser_config(port: int, account: Optional[str] = None, password: Optional[str] = None) -> AccountInBrowser:
     """创建浏览器配置实例"""
     browser_userdata_dir = OUTPUT_DIR / "browser_data" / f"user_data_dir_{port}"
@@ -31,7 +38,7 @@ def create_direct_browser_config() -> dict[int, AccountInBrowser]:
     # 端口与账户映射
     port_configs = {
         9321: {"account": "mahui4228@gmail.com", "password": "password123"},
-        # 9322: {"account": "mahui6188@gmail.com", "password": "password456"},
+        9322: {"account": "g742624689@gmail.com", "password": "password456"},
         9323: {"account": "youka570023@gmail.com", "password": "password789"},
         9324: {"account": "j4732030@gmail.com", "password": "password012"},
         9325: {"account": "mahui8875@gmail.com", "password": "password345"},
@@ -50,4 +57,25 @@ def create_direct_browser_config() -> dict[int, AccountInBrowser]:
     
     return configs
 
-
+def create_direct_browser_config_with_stats() -> dict[int, AccountInBrowser]:
+    """创建浏览器配置并根据数据库统计信息更新剩余访问次数"""
+    # 获取基础配置
+    configs = create_direct_browser_config()
+    
+    # 获取数据库统计信息
+    db_manager = DbManager()
+    account_stats = db_manager.get_monthly_account_stats()
+    
+    # 将统计信息转换为字典格式方便查找
+    stats_dict = {stat[0]: stat[1] for stat in account_stats}
+    
+    # 遍历配置并更新剩余访问次数
+    for port, config in configs.items():
+        if config.account and config.account in stats_dict:
+            # 剩余次数 = 最大次数 - 出现次数
+            config.remaining_access_limit = config.max_access_limit - stats_dict[config.account]
+        else:
+            # 如果账户没有出现在统计中,保持默认值
+            config.remaining_access_limit = config.max_access_limit
+    
+    return configs

+ 3 - 3
src/browser/crawl_base.py

@@ -112,7 +112,7 @@ class AbstractCrawlerBase(ABC):
         browser_config = BrowserConfig(
             headless=False,
             use_managed_browser=True,
-            cdp_url=self.page.browser._driver._websocket_url
+            cdp_url=self.page.browser._driver.address
         )
         logger.info(f"{self.browser_config}")
         logger.info(f"len {len(raw_html)} {type(raw_html)} {raw_html[:150]}")
@@ -151,7 +151,7 @@ class AbstractCrawlerBase(ABC):
             self._initialize_page()
         self.page.get(url)
         self.browser_config.update({
-            "cdp_url": self.page.browser._driver._websocket_url 
+            "cdp_url": self.page.browser._driver.address
         })
         # logger.info(f"get {url}, browser_config: {self.browser_config}")
 
@@ -160,7 +160,7 @@ class AbstractCrawlerBase(ABC):
             self._initialize_page()
         await asyncio.to_thread(self.page.get, url)
         self.browser_config.update({
-            "cdp_url": self.page.browser._driver._websocket_url 
+            "cdp_url": self.page.browser._driver.address
         })
     
     @abstractmethod

+ 49 - 0
src/flow_task/asin_mhtml_parser.py

@@ -0,0 +1,49 @@
+import asyncio
+import json
+import os
+from upath import UPath
+from utils.drission_page import ChromeOptions
+from config.settings import CFG
+from src.browser.crawl_asin import AsinCrawler
+from utils.logu import get_logger
+from src.browser.browser_config import AccountInBrowser
+
+logger = get_logger('browser')
+
+async def async_run_extractions(crawler: AsinCrawler, html_content: str, upload_s3_dir: str = None):
+    """异步运行提取逻辑"""
+    product_info, result_table, limitation = await asyncio.gather(
+        crawler.extract_product_and_save_resource(html_content, upload_s3_dir),
+        crawler.extra_result_table(html_content),
+        crawler.extra_limitation(html_content),
+    )
+    res = {
+        "result_table": result_table,
+        "limitation": limitation,
+    }
+    res.update(product_info)
+    return res
+
+async def async_process_mhtml(mhtml_path: str, browser_config: AccountInBrowser):
+    """异步处理MHTML文件"""
+    # 使用浏览器配置初始化AsinCrawler
+    crawler = AsinCrawler.create_browser(
+        address=browser_config.browser_address,
+        user_data_dir=browser_config.browser_userdata_dir,
+    )
+    try:
+        logger.info(f"异步任务启动: {mhtml_path}")
+        logger.info(f"crawler.page.browser._driver.address: {crawler.page.browser._driver.address}")
+        upload_s3_dir = UPath(mhtml_path).parent
+        temp_mhtml_path, html_content = await asyncio.to_thread(crawler.get_mpath_html_content, mhtml_path)
+        results = await async_run_extractions(crawler, html_content, upload_s3_dir)
+        json_result = json.dumps(results, indent=4, ensure_ascii=False)
+        logger.info(f"{json_result}")
+        return {'status': 'success', 'data': results}
+    
+    except Exception as e:
+        logger.exception(f"异步任务失败: {str(e)}")
+        raise
+    finally:
+        if 'temp_mhtml_path' in locals() and os.path.exists(temp_mhtml_path):
+            await asyncio.to_thread(os.unlink, temp_mhtml_path)

+ 34 - 27
src/flow_task/crawl_asin.py

@@ -46,7 +46,7 @@ class CrawlAsinInput(BaseModel):
     asin_area: Optional[AsinAreaEnum] = Field(default=AsinAreaEnum.JP, description="ASIN地区")
     mthml_type: Optional[bool] = Field(default=True, description="是否保存为MHTML格式")
     save_path: Optional[str] = Field(default=None, description="保存路径")
-    overwrite: Optional[bool] = Field(default=False, description="是否覆盖已存在文件")
+    refresh_cache: Optional[bool] = Field(default=False, description="是否覆盖已存在文件")
     browser: AccountInBrowser = Field(description="浏览器账号信息")
 
 
@@ -98,28 +98,24 @@ class CrawlAsinFlow(BaseCrawlFlow):
     )
     def task_save_page(self, asin: str, asin_area: AsinAreaEnum, mthml_type: bool):
         """获取页面数据并保存到本地temp目录的task方法"""
-        self.run_log.info(f"开始获取页面数据: {asin} {asin_area} {mthml_type}")
+        self.run_log.info(f"开始获取页面数据: {asin} {asin_area} mthml_type: {mthml_type}")
         
-        # 获取页面数据
-        # data = self.crawler.get_asin_page_data(
+        # # 获取页面数据
+        # self.crawler.get_asin_and_save_page(
         #     asin=asin,
         #     asin_area=asin_area,
         #     mthml_type=mthml_type
         # )
-        self.crawler.page.get('https://docs.llamaindex.ai/en/stable/examples/output_parsing/llm_program/#define-a-custom-output-parser')
+        url = self.crawler.get_asin_url(asin=asin, asin_area=asin_area)
+        self.crawler.page.get(url)
+        # self.crawler.page.get('https://docs.llamaindex.ai/en/stable/examples/output_parsing/llm_program/#define-a-custom-output-parser')
         # 生成本地temp保存路径
         local_dir = TEMP_PAGE_DIR / "asinseed"/asin
 
         local_dir.mkdir(parents=True, exist_ok=True)
-        
         extension = ".mhtml" if mthml_type else ".html"
         local_path = local_dir / f"{asin}{extension}"
-        data = self.crawler.save_mhtml(local_path)
-        # logger.info(f"data {data}")
-        # save_to_file(data, local_path)
-        # with open(local_path, "w", encoding='utf-8') as f:
-        #     self.run_log.info(f"正在写入数据到本地文件: {local_path}, len: {len(data)}")
-        #     f.write(data)
+        self.crawler.save_mhtml(local_path)
         self.run_log.info(f"成功保存到本地temp目录: {local_path}")
         return str(local_path)
 
@@ -160,39 +156,53 @@ class CrawlAsinFlow(BaseCrawlFlow):
             # 保存到数据库
             if existing_record:
                 # 更新现有记录
-                existing_record.mhtml_path = final_path
+                existing_record.mhtml_path = http_url
                 existing_record.asin_area = asin_area
-                db_manager.save_asin_seed(existing_record)
+                existing_record.browser_account = self.flow_input.browser.account
+                asin_model = db_manager.save_asin_seed(existing_record)
                 self.run_log.info(f"更新数据库记录: ASIN {asin}")
             else:
                 # 创建新记录
-                new_record = AsinSeed(asin=asin, asin_area=asin_area, mhtml_path=final_path)
-                db_manager.save_asin_seed(new_record)
+                new_record = AsinSeed(
+                    asin=asin, 
+                    asin_area=asin_area, 
+                    mhtml_path=http_url,
+                    browser_account=self.flow_input.browser.account
+                )
+                asin_model = db_manager.save_asin_seed(new_record)
                 self.run_log.info(f"创建数据库记录: ASIN {asin}")
             
-            return http_url
+            return asin_model
         except Exception as s3_error:
             self.run_log.error(f"S3上传失败: {s3_error}")
             raise Exception(f"S3上传失败: {s3_error}")
 
     def _get_and_save_page_data(self):
         """获取页面数据并保存的方法"""
+        # 在调用函数前判断 asin 是否存在,如果存在 mhtml_path 则返回 asin_model,跳过该记录
+        db_manager = DbManager()
+        existing_record = db_manager.get_asin_seed(self.flow_input.asin)
+        
+        if existing_record and existing_record.mhtml_path:
+            self.run_log.info(f"ASIN {self.flow_input.asin} 已存在 mhtml_path,跳过处理")
+            return existing_record
+        
         # 第一步:获取数据并保存到本地temp目录
-        local_file_path = self.task_save_page.with_options(refresh_cache=True)(
+        local_file_path = self.task_save_page.with_options(refresh_cache=self.flow_input.refresh_cache)(
             asin=self.flow_input.asin,
             asin_area=self.flow_input.asin_area,
             mthml_type=self.flow_input.mthml_type,
         )
         
         # 第二步:将temp目录文件上传到S3并保存到数据库
-        s3_path = self.task_save_to_db.with_options(refresh_cache=True)(
+        asin_model = self.task_save_to_db.with_options(refresh_cache=self.flow_input.refresh_cache)(
             local_file_path=local_file_path,
             asin=self.flow_input.asin,
             mthml_type=self.flow_input.mthml_type,
             asin_area=self.flow_input.asin_area
         )
         
-        return s3_path
+        return asin_model
     
     def run(self):
         """执行流程"""
@@ -200,18 +210,16 @@ class CrawlAsinFlow(BaseCrawlFlow):
         self.run_log.info(f"ASIN: {self.flow_input.asin}")
         self.run_log.info(f"地区: {self.flow_input.asin_area}")
         self.run_log.info(f"MHTML格式: {self.flow_input.mthml_type}")
-        self.run_log.info(f"覆盖模式: {self.flow_input.overwrite}")
+        self.run_log.info(f"刷新缓存: {self.flow_input.refresh_cache}")
         
         try:
             # 使用task方法获取和保存页面数据
-            final_path = self._get_and_save_page_data()
+            asin_model = self._get_and_save_page_data()
             
-            self.run_log.info(f"流程执行成功,保存路径: {final_path}")
+            self.run_log.info(f"流程执行成功,保存路径: {asin_model}")
             return {
                 'status': 'success',
-                'path': final_path,
-                'asin': self.flow_input.asin,
-                'asin_area': self.flow_input.asin_area
+                'asin_model': asin_model,
             }
             
         except Exception as e:
@@ -226,7 +234,6 @@ class CrawlAsinFlow(BaseCrawlFlow):
 def crawl_asin_flow(flow_input: CrawlAsinInput):
     """ASIN页面爬取Prefect流程"""
     self = CrawlAsinFlow(flow_input)
-    self.run_log.info(f"启动ASIN爬取流程: {flow_input.asin}")
     result = self.run()
     
     return result

+ 32 - 1
src/flow_task/db/product_import_db.py

@@ -1,6 +1,6 @@
 from datetime import datetime
 from typing import Optional, List
-from sqlmodel import SQLModel, create_engine, Session, select, Field
+from sqlmodel import SQLModel, create_engine, Session, select, Field, text
 from config.settings import DB_URL
 from utils.sql_engine import create_db_and_tables
 
@@ -135,6 +135,37 @@ class ProductImportManager:
                 session.commit()
                 return True
             return False
+    def execute_raw_sql(self, sql_query: str, params: Optional[dict] = None):
+        """执行通用SQL语句"""
+        with Session(self.engine) as session:
+            if params:
+                result = session.exec(text(sql_query).params(**params))
+            else:
+                result = session.exec(text(sql_query))
+            session.commit()
+            return result.all()
+    
+    def get_monthly_product_imports_by_asin_exists(self, asin_exists: bool = False, to_dict: bool = False):
+        """从public.monthly_product_imports表按asin_exists字段筛选记录
+        
+        Args:
+            asin_exists: 是否存在asin,默认为False
+            to_dict: 是否返回字典格式,默认为False
+            
+        Returns:
+            List[ProductImport]: 按asin_exists筛选的记录列表
+        """
+        sql_query = "SELECT id, product_name, created_at, competitor, asin_exists FROM public.monthly_product_imports WHERE asin_exists = :asin_exists"
+        params = {"asin_exists": asin_exists}
+        
+        with Session(self.engine) as session:
+            result = session.exec(text(sql_query).params(**params))
+            list_model = result.all()
+            
+            if to_dict:
+                return [dict(row._mapping) for row in list_model]
+            else:
+                return list_model
 
 
 # 创建全局实例

+ 136 - 0
src/flow_task/depoly.py

@@ -0,0 +1,136 @@
+"""
+ASIN爬取流程部署脚本
+====================
+
+该脚本用于部署crawl_asin_flow到Prefect服务器,支持并发控制。
+"""
+from prefect import deploy
+from src.flow_task.crawl_asin import crawl_asin_flow, CrawlAsinInput, AsinAreaEnum
+from src.browser.browser_config import create_direct_browser_config_with_stats
+from src.flow_task.db.product_import_db import product_import_manager
+from typing import List, Optional
+import logging
+
+# 设置日志
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+def deploy_crawl_asin_flow(
+    name: str = "ASIN爬取流程部署",
+    concurrency_limit: int = 4,
+    tags: List[str] = ["asin", "crawl", "amazon"],
+    description: str = "Amazon ASIN页面爬取流程,支持多地区并发爬取"
+):
+    """
+    部署crawl_asin_flow到Prefect服务器
+    
+    Args:
+        name: 部署名称
+        concurrency_limit: 并发限制
+        tags: 部署标签
+        description: 部署描述
+    """
+    try:
+        logger.info(f"开始部署 {name}...")
+        
+        # 部署flow
+        deployment = crawl_asin_flow.serve(
+            name=name,
+            tags=tags,
+            global_limit=4
+        )
+        
+        logger.info(f"部署成功!部署ID: {deployment.id}")
+        logger.info(f"并发限制: {concurrency_limit}")
+        logger.info(f"标签: {tags}")
+        
+        return deployment
+        
+    except Exception as e:
+        logger.error(f"部署失败: {e}")
+        raise
+
+def create_sample_flow_inputs(count: int = 10) -> List[CrawlAsinInput]:
+    """
+    创建示例flow输入,用于测试部署
+    
+    Args:
+        count: 创建的输入数量
+        
+    Returns:
+        List[CrawlAsinInput]: 示例输入列表
+    """
+    try:
+        # 获取浏览器配置
+        browser_configs = create_direct_browser_config_with_stats()
+        if not browser_configs:
+            raise ValueError("没有可用的浏览器配置")
+        
+        # 获取本月产品数据
+        monthly_products = product_import_manager.get_monthly_product_imports_by_asin_exists(to_dict=True)
+        if not monthly_products:
+            raise ValueError("本月没有找到任何产品数据")
+        
+        # 创建输入列表
+        flow_inputs = []
+        browser_list = list(browser_configs.values())
+        
+        for i, product in enumerate(monthly_products[:count]):
+            competitor = product.get('competitor')
+            if not competitor:
+                continue
+                
+            # 循环使用浏览器配置
+            browser_config = browser_list[i % len(browser_list)]
+            
+            flow_input = CrawlAsinInput(
+                asin=competitor,
+                asin_area=AsinAreaEnum.JP,
+                mthml_type=True,
+                refresh_cache=False,
+                browser=browser_config
+            )
+            
+            flow_inputs.append(flow_input)
+            
+        logger.info(f"创建了 {len(flow_inputs)} 个示例输入")
+        return flow_inputs
+        
+    except Exception as e:
+        logger.error(f"创建示例输入失败: {e}")
+        return []
+
+def main():
+    """
+    主函数:部署crawl_asin_flow
+    """
+    try:
+        # 部署flow
+        deployment = deploy_crawl_asin_flow(
+            name="ASIN爬取流程部署",
+            concurrency_limit=4,
+            tags=["asin", "crawl", "amazon", "production"],
+            description="Amazon ASIN页面爬取流程,支持多地区并发爬取,限制并发数为4"
+        )
+        
+        print("=" * 60)
+        print("部署成功!")
+        print(f"部署名称: {deployment.name}")
+        print(f"部署ID: {deployment.id}")
+        print(f"并发限制: 4")
+        print(f"Flow函数: {crawl_asin_flow.__name__}")
+        print("=" * 60)
+        
+        # # 可选:创建示例输入
+        # sample_inputs = create_sample_flow_inputs(5)
+        # if sample_inputs:
+        #     print(f"\n创建了 {len(sample_inputs)} 个示例输入用于测试:")
+        #     for i, input_data in enumerate(sample_inputs, 1):
+        #         print(f"  {i}. ASIN: {input_data.asin}, 浏览器: {input_data.browser.browser_address}")
+        
+    except Exception as e:
+        print(f"部署失败: {e}")
+        raise
+
+if __name__ == "__main__":
+    main()

+ 38 - 1
src/manager/core/db.py

@@ -1,6 +1,6 @@
 from datetime import datetime
 from typing import Optional
-from sqlmodel import SQLModel, create_engine, Session, select, Field
+from sqlmodel import SQLModel, create_engine, Session, select, Field, text
 from config.settings import DB_URL
 from utils.sql_engine import create_db_and_tables,drop_table,engine
 from src.models.asin_model import AsinSeed
@@ -45,6 +45,17 @@ class DbManager:
             else:
                 return list_model
 
+    def get_all_asin(self, to_dict:bool=False) -> list[AsinSeed]:
+        """获取所有ASIN记录,不限制extra_result_path和mhtml_path条件"""
+        with Session(self.engine) as session:
+            statement = select(AsinSeed)
+            results = session.exec(statement)
+            list_model = results.all()
+            if to_dict:
+                return [model.model_dump() for model in list_model]
+            else:
+                return list_model
+
     def delete_asin_seed_by_id(self, asin_id: int) -> bool:
         """根据id删除asin_seed记录,如果s3路径存在,连同一起删除"""
         with Session(self.engine) as session:
@@ -72,6 +83,32 @@ class DbManager:
             
             return True
 
+    def execute_raw_sql(self, sql_query: str, params: Optional[dict] = None):
+        """执行通用SQL语句"""
+        with Session(self.engine) as session:
+            if params:
+                result = session.exec(text(sql_query).params(**params))
+            else:
+                result = session.exec(text(sql_query))
+            session.commit()
+            return result.all()
+    
+    def get_monthly_account_stats(self):
+        """统计本月账户出现次数"""
+        sql_query = """
+        SELECT 
+            browser_account, 
+            COUNT(*) AS count 
+        FROM 
+            public.asinseed 
+        WHERE 
+            EXTRACT(YEAR FROM created_at) = EXTRACT(YEAR FROM CURRENT_DATE) 
+            AND EXTRACT(MONTH FROM created_at) = EXTRACT(MONTH FROM CURRENT_DATE) 
+        GROUP BY 
+            browser_account;
+        """
+        return self.execute_raw_sql(sql_query)
+
 class ProductManager:
     def __init__(self, engine: str=None):
         self.engine = engine or create_engine(DB_URL)

+ 2 - 0
src/models/asin_model.py

@@ -14,6 +14,8 @@ class AsinSeed(SQLModel, table=True):
     extra_result_path: Optional[str] = None
     mhtml_path: Optional[str] = None
     error: Optional[str] = None
+    # 浏览器账号信息
+    browser_account: Optional[str] = Field(default=None, description="浏览器登录的Google账号")
     created_at: Optional[datetime] = Field(default_factory=datetime.now)
 
 

+ 253 - 0
tests/flow_run/t_flow_run_asin_mhtml_parser.py

@@ -0,0 +1,253 @@
+"""
+ASIN MHTML解析流程测试脚本
+========================
+
+该脚本用于测试async_process_mhtml函数,使用浏览器配置初始化AsinCrawler。
+"""
+import asyncio
+import sys
+import os
+from pprint import pprint
+from src.flow_task.asin_mhtml_parser import async_process_mhtml
+from src.browser.browser_config import create_direct_browser_config_with_stats
+from src.flow_task.db.product_import_db import product_import_manager
+from src.manager.core.db import DbManager
+from utils.logu import get_logger
+
+logger = get_logger('test')
+
+def test_async_process_mhtml_with_browser_config():
+    """测试使用浏览器配置的async_process_mhtml函数"""
+    print("测试async_process_mhtml函数(使用浏览器配置)...")
+    print("=" * 60)
+    
+    try:
+        # 获取所有浏览器配置
+        browser_configs = create_direct_browser_config_with_stats()
+        if not browser_configs:
+            print("没有可用的浏览器配置")
+            return
+        
+        print(f"可用浏览器配置: {list(browser_configs.keys())}")
+        
+        # 选择第一个可用的浏览器配置
+        browser_id = list(browser_configs.keys())[0]
+        browser_config = browser_configs[browser_id]
+        
+        print(f"使用浏览器配置 {browser_id}: {browser_config.browser_address}")
+        
+        # 获取一个已存在的MHTML文件进行测试
+        db_manager = DbManager()
+        
+        # 获取一个有mhtml_path的ASIN记录
+        asin_records = db_manager.get_asin_completed(to_dict=False)
+        
+        if not asin_records:
+            print("没有找到有MHTML文件的ASIN记录")
+            print("请先运行ASIN爬取流程来生成MHTML文件")
+            return
+        
+        asin_record = asin_records[0]
+        mhtml_path = asin_record.mhtml_path
+        
+        print(f"找到ASIN记录: {asin_record.asin}")
+        print(f"MHTML路径: {mhtml_path}")
+        
+        # 测试async_process_mhtml函数
+        print("开始测试async_process_mhtml函数...")
+        
+        # 运行异步函数
+        result = asyncio.run(async_process_mhtml(mhtml_path, browser_config))
+        
+        print("async_process_mhtml执行完成")
+        print(f"结果状态: {result.get('status')}")
+        
+        if result.get('status') == 'success':
+            data = result.get('data', {})
+            print("解析结果:")
+            pprint(data)
+        else:
+            print("解析失败")
+            
+    except Exception as e:
+        print(f"测试async_process_mhtml时出错: {e}")
+        logger.exception(f"测试async_process_mhtml时出错: {e}")
+    
+    print("=" * 60)
+
+def test_async_process_mhtml_with_default_config():
+    """测试使用默认配置的async_process_mhtml函数"""
+    print("测试async_process_mhtml函数(使用默认配置)...")
+    print("=" * 60)
+    
+    try:
+        # 获取一个已存在的MHTML文件进行测试
+        db_manager = DbManager()
+        
+        # 获取一个有mhtml_path的ASIN记录
+        asin_records = db_manager.get_asin_completed(to_dict=False)
+        
+        if not asin_records:
+            print("没有找到有MHTML文件的ASIN记录")
+            print("请先运行ASIN爬取流程来生成MHTML文件")
+            return
+        
+        asin_record = asin_records[0]
+        mhtml_path = asin_record.mhtml_path
+        
+        print(f"找到ASIN记录: {asin_record.asin}")
+        print(f"MHTML路径: {mhtml_path}")
+        
+        # 测试async_process_mhtml函数(不传递浏览器配置)
+        print("开始测试async_process_mhtml函数(默认配置)...")
+        
+        # 运行异步函数
+        result = asyncio.run(async_process_mhtml(mhtml_path))
+        
+        print("async_process_mhtml执行完成")
+        print(f"结果状态: {result.get('status')}")
+        
+        if result.get('status') == 'success':
+            data = result.get('data', {})
+            print("解析结果:")
+            pprint(data)
+        else:
+            print("解析失败")
+            
+    except Exception as e:
+        print(f"测试async_process_mhtml(默认配置)时出错: {e}")
+        logger.exception(f"测试async_process_mhtml(默认配置)时出错: {e}")
+    
+    print("=" * 60)
+
+async def process_single_asin(asin_record, browser_config):
+    """处理单个ASIN记录的异步函数"""
+    try:
+        print(f"开始处理ASIN: {asin_record.asin}")
+        result = await async_process_mhtml(asin_record.mhtml_path, browser_config)
+        return {
+            'asin': asin_record.asin,
+            'status': result.get('status'),
+            'data': result.get('data') if result.get('status') == 'success' else None
+        }
+    except Exception as e:
+        print(f"解析ASIN {asin_record.asin} 时出错: {e}")
+        return {
+            'asin': asin_record.asin,
+            'status': 'error',
+            'error': str(e)
+        }
+
+async def concurrent_mhtml_parsing():
+    """并发解析多个MHTML文件,使用多个浏览器"""
+    print("开始并发解析多个MHTML文件...")
+    print("=" * 60)
+    
+    try:
+        # 获取所有浏览器配置
+        browser_configs = create_direct_browser_config_with_stats()
+        if not browser_configs:
+            print("没有可用的浏览器配置")
+            return
+        
+        print(f"可用浏览器配置: {list(browser_configs.keys())}")
+        
+        # 获取多个已存在的MHTML文件进行测试
+        db_manager = DbManager()
+        
+        # 获取所有ASIN记录,不限制extra_result_path条件
+        asin_records = db_manager.get_all_asin(to_dict=False)
+        
+        if not asin_records:
+            print("没有找到有MHTML文件的ASIN记录")
+            print("请先运行ASIN爬取流程来生成MHTML文件")
+            return
+        
+        print(f"前3个ASIN记录: {asin_records[:3]}")
+        print(f"找到 {len(asin_records)} 个ASIN记录")
+        y = input("是否继续?")
+        if y != 'y':
+            return
+        # 创建浏览器配置列表用于并发处理
+        asin_records = asin_records[:1]
+        browser_config_list = list(browser_configs.values())
+        num_browsers = len(browser_config_list)
+        
+        print(f"使用 {num_browsers} 个浏览器进行并发处理")
+        
+        # 创建并发任务
+        tasks = []
+        for i, asin_record in enumerate(asin_records):
+            # 轮询选择浏览器配置
+            browser_config = browser_config_list[i % num_browsers]
+            task = process_single_asin(asin_record, browser_config)
+            tasks.append(task)
+        
+        # 并发执行所有任务
+        results = await asyncio.gather(*tasks, return_exceptions=True)
+        
+        # 处理结果
+        processed_results = []
+        for result in results:
+            if isinstance(result, Exception):
+                print(f"任务执行异常: {result}")
+                processed_results.append({
+                    'asin': 'unknown',
+                    'status': 'error',
+                    'error': str(result)
+                })
+            else:
+                processed_results.append(result)
+                print(f"ASIN {result['asin']} 解析完成,状态: {result['status']}")
+        
+        # 打印汇总结果
+        print("\n" + "=" * 60)
+        print("并发解析结果汇总:")
+        success_count = sum(1 for r in processed_results if r['status'] == 'success')
+        error_count = sum(1 for r in processed_results if r['status'] == 'error')
+        
+        print(f"成功: {success_count}, 失败: {error_count}")
+        print(f"总处理数: {len(processed_results)}")
+        
+        for result in processed_results:
+            print(f"ASIN: {result['asin']}, 状态: {result['status']}")
+            
+        return processed_results
+            
+    except Exception as e:
+        print(f"并发解析MHTML时出错: {e}")
+        logger.exception(f"并发解析MHTML时出错: {e}")
+        return []
+    
+    finally:
+        print("=" * 60)
+
+def run_concurrent_parsing():
+    """直接运行并发解析,不进行测试"""
+    print("直接并发运行ASIN MHTML解析")
+    print("=" * 60)
+    
+    # 直接运行并发解析
+    results = asyncio.run(concurrent_mhtml_parsing())
+    
+    print("并发解析完成")
+    return results
+
+def main():
+    """主函数 - 直接并发运行"""
+    print("ASIN MHTML解析流程 - 并发运行")
+    print("=" * 60)
+    
+    # 直接运行并发解析
+    results = run_concurrent_parsing()
+    
+    # 输出最终统计信息
+    if results:
+        success_count = sum(1 for r in results if r['status'] == 'success')
+        error_count = sum(1 for r in results if r['status'] == 'error')
+        print(f"\n最终统计: 成功 {success_count} 个, 失败 {error_count} 个")
+    
+    print("并发解析完成")
+
+if __name__ == "__main__":
+    main()

+ 228 - 44
tests/flow_run/t_flow_run_crawl_asin.py

@@ -10,45 +10,18 @@ import random
 from pprint import pprint
 from datetime import datetime
 from src.flow_task.crawl_asin import crawl_asin_flow, CrawlAsinInput, AsinAreaEnum
-from src.browser.browser_config import create_direct_browser_config
+from src.browser.browser_config import create_direct_browser_config,create_direct_browser_config_with_stats
 from src.browser.crawl_base import AsinCrawlerBase
-
-def t_init_browser():
-    """主函数"""
-    # 获取所有浏览器配置
-    browser_configs = create_direct_browser_config()
-    
-    print(f"获取到 {len(browser_configs)} 个浏览器配置")
-    
-    # 遍历所有浏览器配置,为每个配置创建AsinCrawlerBase实例并调用get_home_page
-    for port, browser_config in browser_configs.items():
-        print(f"正在使用浏览器配置: 端口 {port}, 账号 {browser_config.account}")
-        
-        try:
-            # 使用AsinCrawlerBase的create_browser类方法创建实例
-            crawler = AsinCrawlerBase.create_browser(
-                address=browser_config.browser_address,
-                user_data_dir=str(browser_config.browser_userdata_dir)
-            )
-            
-            # 调用get_home_page方法
-            print(f"正在为账号 {browser_config.account} 获取首页...")
-            crawler.get_home_page()
-            print(f"成功为账号 {browser_config.account} 获取首页")
-            
-        except Exception as e:
-            print(f"为账号 {browser_config.account} 获取首页时出错: {e}")
-            
-        print("-" * 50)
-
+from src.flow_task.db.product_import_db import product_import_manager
 
 competitor_list = ["B09MQMTBJW","B000THQ4ZO","B0D6RVGL2M","B004OCLMTI","B0D7TKHSP4","B000THROUS","B08HK93VBD","B0C6LXPSVX","B0C8MRSD6P","B08LD1MZX4","B0CLCJXXWF"]
 
-def t_random_crawl_asin():
+def t_crawl_asin_flow():
     """随机选择一个ASIN和浏览器配置,调用crawl_asin_flow"""
     # 获取所有浏览器配置
-    browser_configs = create_direct_browser_config()
-    
+    # browser_configs = create_direct_browser_config()
+    # {9321: AccountInBrowser(browser_address='127.0.0.1:9321', browser_userdata_dir=WindowsPath('G:/code/amazone/copywriting_production/output/browser_data/user_data_dir_9321'), chrome_config_ini=None, proxy=None, active=True, host_name='pc', account='mahui4228@gmail.com', password='password123', max_access_limit=10, remaining_access_limit=10)
+    browser_configs = create_direct_browser_config_with_stats()
     if not browser_configs:
         print("没有可用的浏览器配置")
         return
@@ -57,40 +30,251 @@ def t_random_crawl_asin():
         print("没有可用的ASIN列表")
         return
     
+    pprint(browser_configs)
+    
     # 随机选择一个ASIN和浏览器配置
-    random_asin = competitor_list[0]
-    random_port = random.choice(list(browser_configs.keys()))
-    random_browser_config = browser_configs[9323]
+    asin = competitor_list[2]
+    browser_config = browser_configs[9323]
     
-    print(f"随机选择的ASIN: {random_asin}")
-    print(f"随机选择的浏览器配置: 端口 {random_port}, 账号 {random_browser_config.account}")
+    print(f"随机选择的ASIN: {asin}")
+    print(f"随机选择的浏览器配置: 端口 {browser_config},")
     
     try:
         # 创建CrawlAsinInput对象
         flow_input = CrawlAsinInput(
-            asin=random_asin,
+            asin=asin,
             asin_area=AsinAreaEnum.JP,  # 默认使用日本地区
             mthml_type=True,  # 保存为MHTML格式
-            overwrite=False,  # 不覆盖已存在文件
-            browser=random_browser_config
+            # refresh_cache=True,  # 不覆盖已存在文件
+            refresh_cache=False,
+            browser=browser_config
         )
         
         print(f"开始执行ASIN爬取流程...")
         
         # 调用crawl_asin_flow
-        result = crawl_asin_flow(flow_input)
+        state = crawl_asin_flow(flow_input, return_state=True)
+        pprint(state)
         
+        pprint(state.result())
         print(f"爬取流程执行成功")
-        pprint(result)
         
     except Exception as e:
         print(f"执行ASIN爬取流程时出错: {e}")
         
     print("-" * 50)
 
+def t_print_monthly_competitor_lists():
+    """从product_import_manager中获取本月所有的数据,打印出各个商品的competitor_list"""
+    print("获取本月所有产品的competitor_list...")
+    print("=" * 60)
+    
+    try:
+        # 获取本月所有产品数据
+        monthly_products = product_import_manager.get_monthly_product_imports_by_asin_exists(to_dict=True)
+        '''monthly_products = 
+[{'asin_exists': False,
+  'competitor': 'B0020FO356',
+  'created_at': datetime.datetime(2025, 8, 8, 15, 57, 6, 997403),
+  'id': 1,
+  'product_name': '1P双头压刀镊子'},
+ {'asin_exists': False,
+  'competitor': 'B00F8BH8XS',
+  'created_at': datetime.datetime(2025, 8, 8, 15, 57, 6, 997403),
+  'id': 1,
+  'product_name': '1P双头压刀镊子'},...]
+  '''
+        pprint(monthly_products)
+        return
+        if not monthly_products:
+            print("本月没有找到任何产品数据")
+            return
+        
+        print(f"本月共找到 {len(monthly_products)} 个产品:")
+        print("-" * 60)
+        
+        for i, product in enumerate(monthly_products, 1):
+            product_name = product.get('product_name', '未知产品')
+            filename = product.get('filename', '未知文件')
+            id = product.get('id', '未知id')
+            
+            print(f"产品 {i}: {product_name}")
+            print(f"文件名: {filename}")
+            print(f"ID: {id}")
+            
+            # 从product_data中解析competitor_list
+            try:
+                import json
+                product_data = json.loads(product.get('product_data', '{}'))
+                competitor_list = product_data.get('competitor_list', [])
+                
+                if competitor_list:
+                    print(f"竞品ASIN列表: {competitor_list}")
+                else:
+                    print("竞品ASIN列表: 无")
+                    
+            except (json.JSONDecodeError, KeyError) as e:
+                print(f"解析competitor_list时出错: {e}")
+                print("竞品ASIN列表: 解析失败")
+            
+            print("-" * 60)
+            
+    except Exception as e:
+        print(f"获取本月产品数据时出错: {e}")
+
+def t_crawl_multiple_competitors():
+    """从monthly_products中获取competitor,使用所有浏览器配置执行crawl_asin_flow"""
+    print("获取本月所有产品的competitor并执行爬取...")
+    print("=" * 60)
+    
+    try:
+        # 获取本月所有产品数据
+        monthly_products = product_import_manager.get_monthly_product_imports_by_asin_exists(to_dict=True)
+        
+        if not monthly_products:
+            print("本月没有找到任何产品数据")
+            return
+        
+        print(f"本月共找到 {len(monthly_products)} 个产品数据")
+        pprint(monthly_products)
+        y = input("是否继续?")
+        print(f"input: {y}")
+        if y != 'y':
+            print("取消执行")
+            return
+        # 获取所有浏览器配置
+        browser_configs = create_direct_browser_config_with_stats()
+        if not browser_configs:
+            print("没有可用的浏览器配置")
+            return
+        
+        print(f"初始可用浏览器配置: {list(browser_configs.keys())}")
+        
+        # 使用pop方式遍历monthly_products
+        monthly_products_copy = monthly_products.copy()  # 创建副本避免修改原数据
+        processed_count = 0
+        
+        while monthly_products_copy and processed_count < 20:  # 限制最多处理20个产品
+            # 每次循环都重新获取浏览器配置以更新剩余访问次数
+            browser_configs = create_direct_browser_config_with_stats()
+            
+            # 找到还有剩余访问次数的浏览器
+            available_browsers = []
+            for browser_id, browser_config in browser_configs.items():
+                if hasattr(browser_config, 'remaining_access_limit') and browser_config.remaining_access_limit > 0:
+                    available_browsers.append((browser_id, browser_config))
+            
+            if not available_browsers:
+                print("\n没有可用的浏览器配置(所有浏览器剩余访问次数为0)")
+                break
+            
+            print(f"\n当前可用浏览器: {[browser_id for browser_id, _ in available_browsers]}")
+            
+            # 存储本轮任务的状态和信息
+            current_states = []
+            current_task_info = []
+            
+            # 遍历所有可用浏览器,在每个浏览器中处理产品
+            for browser_id, browser_config in available_browsers:
+                if not monthly_products_copy:
+                    break
+                    
+                # 从monthly_products中取出一个产品
+                product = monthly_products_copy.pop(0)
+                competitor = product.get('competitor')
+                
+                if not competitor:
+                    print(f"\n跳过没有competitor数据的产品: {product.get('product_name', '未知产品')}")
+                    continue
+                
+                # 打印当前产品信息
+                print(f"\n处理产品 {processed_count + 1}:")
+                print(f"产品名称: {product.get('product_name', '未知产品')}")
+                print(f"Competitor ASIN: {competitor}")
+                print(f"文件名: {product.get('filename', '未知文件')}")
+                print(f"ID: {product.get('id', '未知id')}")
+                print("-" * 40)
+                
+                print(f"使用浏览器 {browser_id} (剩余访问次数: {browser_config.remaining_access_limit}) 爬取 ASIN: {competitor}")
+                
+                try:
+                    # 创建CrawlAsinInput对象
+                    flow_input = CrawlAsinInput(
+                        asin=competitor,
+                        asin_area=AsinAreaEnum.JP,  # 默认使用日本地区
+                        mthml_type=True,  # 保存为MHTML格式
+                        refresh_cache=True,  # 不覆盖已存在文件
+                        browser=browser_config
+                    )
+                    
+                    print(f"开始执行ASIN爬取流程...")
+                    
+                    # 调用crawl_asin_flow,获取state对象
+                    state = crawl_asin_flow(flow_input, return_state=True)
+                    current_states.append(state)
+                    current_task_info.append({
+                        'browser_id': browser_id,
+                        'asin': competitor,
+                        'state': state,
+                        'product_info': product
+                    })
+                    processed_count += 1
+                    print(f"浏览器 {browser_id} 爬取 ASIN {competitor} 任务已提交")
+                    
+                except Exception as e:
+                    print(f"浏览器 {browser_id} 爬取 ASIN {competitor} 时出错: {e}")
+                    
+                print("=" * 60)
+            
+            # 一轮浏览器遍历完成后,立即等待任务完成并获取结果
+            if current_states:
+                print("\n" + "=" * 60)
+                print("等待本轮任务完成...")
+                print("=" * 60)
+                
+                for task in current_task_info:
+                    browser_id = task['browser_id']
+                    asin = task['asin']
+                    state = task['state']
+                    product_info = task['product_info']
+                    
+                    print(f"\n获取浏览器 {browser_id} 爬取 ASIN {asin} 的结果:")
+                    print(f"产品信息: {product_info.get('product_name', '未知产品')}")
+                    print(f"文件名: {product_info.get('filename', '未知文件')}")
+                    print("-" * 40)
+                    
+                    try:
+                        # 获取任务最终结果
+                        result = state.result()
+                        print(f"浏览器 {browser_id} 爬取 ASIN {asin} 成功完成")
+                        print(f"结果类型: {type(result)}")
+                        if hasattr(result, '__dict__'):
+                            print(f"结果属性: {vars(result)}")
+                        else:
+                            print(f"结果内容: {result}")
+                            
+                    except Exception as e:
+                        print(f"浏览器 {browser_id} 爬取 ASIN {asin} 获取结果时出错: {e}")
+            
+            # 在while循环层更新浏览器剩余次数
+            print(f"\n=== 本轮浏览器状态更新 ===")
+            updated_browser_configs = create_direct_browser_config_with_stats()
+            for browser_id, browser_config in updated_browser_configs.items():
+                if hasattr(browser_config, 'remaining_access_limit'):
+                    print(f"浏览器 {browser_id} 剩余访问次数: {browser_config.remaining_access_limit}")
+            print("=" * 60)
+        
+        print("\n" + "=" * 60)
+        print(f"所有爬取任务完成,共处理 {processed_count} 个产品")
+        
+    except Exception as e:
+        print(f"执行批量爬取时出错: {e}")
+
 def main():
     # t_init_browser()
-    t_random_crawl_asin()
+    # t_crawl_asin_flow()
+    # t_print_monthly_competitor_lists()
+    t_crawl_multiple_competitors()
 
 if __name__ == "__main__":
     main()

+ 1 - 1
tests/flow_run/t_flow_run_extra_product.py

@@ -3,7 +3,7 @@ from typing import List
 import asyncio
 from prefect import flow, task
 from prefect.states import Completed, Failed
-from src.flow_task.crawl_asin_flow import get_or_create_product_import_by_url, product_import_flow, ProductImportInput
+from src.flow_task.extra_excel_product_flow import get_or_create_product_import_by_url, product_import_flow, ProductImportInput
 from utils.url_utils import extract_filename_from_url, extract_urls_from_text
 from src.flow_task.db.product_import_db import product_import_manager
 from utils.logu import get_logger

+ 5 - 1
tests/mytest/t_delete_asin_seed.py

@@ -6,7 +6,11 @@ logger = get_logger('delete_asin_seed')
 def main():
     # 要删除的asin_seed id列表
     asin_ids_to_delete = [81, 80, 79, 78, 77, 76, 75, 74, 73]
-    
+    str_ids= '''72
+71
+70
+69'''
+    asin_ids_to_delete = [int(id) for id in str_ids.split('\n')]
     # 初始化数据库管理器
     db_manager = DbManager()
     

+ 10 - 0
tests/mytest/t_monthly_account_stats.py

@@ -0,0 +1,10 @@
+from src.manager.core.db import DbManager
+from utils.logu import get_logger
+
+logger = get_logger('test')
+
+def main():
+    pass
+
+if __name__ == "__main__":
+    main()