Jelajahi Sumber

drissionpage保存为mhtml兼容Windows换行、上传s3.vs1.lan端点

mrh 4 bulan lalu
induk
melakukan
dc6e6b4cce

+ 1 - 1
pyproject.toml

@@ -15,7 +15,7 @@ dependencies = [
     "celery>=5.4.0",
     "crawl4ai>=0.6.3",
     "docling>=2.43.0",
-    "drissionpage>=4.1.0.17",
+    "drissionpage>=4.1.1.2",
     "flower>=2.0.1",
     "httpx>=0.28.1",
     "llama-index>=0.12.42",

+ 53 - 0
src/browser/browser_config.py

@@ -0,0 +1,53 @@
+from pathlib import Path
+from typing import Optional, Union
+from pydantic import BaseModel, Field
+from enum import StrEnum
+from config.settings import OUTPUT_DIR
+
+
+class BaseCommon(BaseModel):
+    """基础通用输入模型"""
+    browser_address: Optional[str] = Field(default='127.0.0.1:16800', description="浏览器调试地址")
+    browser_userdata_dir: Optional[Union[str, Path]] = Field(default=None, description="浏览器用户数据目录")
+    chrome_config_ini: Optional[str] = Field(default=None, description="Chrome配置INI文件路径")
+    proxy: Optional[str] = Field(default=None, description="浏览器要使用的代理")
+    active: Optional[bool] = Field(default=True, description="是否激活浏览器")
+
+class AccountInBrowser(BaseCommon):
+    account: Optional[str] = Field(default=None, description="浏览器要登录的Google账号")
+    password: Optional[str] = Field(default=None, description="浏览器要登录的Google密码")
+def create_browser_config(port: int, account: Optional[str] = None, password: Optional[str] = None) -> AccountInBrowser:
+    """创建浏览器配置实例"""
+    browser_userdata_dir = OUTPUT_DIR / "browser_data" / f"user_data_dir_{port}"
+    return AccountInBrowser(
+        browser_address=f"127.0.0.1:{port}",
+        browser_userdata_dir=browser_userdata_dir,
+        account=account,
+        password=password
+    )
+
+def create_direct_browser_config() -> dict[int, AccountInBrowser]:
+    """直接实例化浏览器配置,根据预定义的端口配置信息"""
+    # 端口与账户映射
+    port_configs = {
+        9321: {"account": "mahui4228@gmail.com", "password": "password123"},
+        # 9322: {"account": "mahui6188@gmail.com", "password": "password456"},
+        9323: {"account": "youka570023@gmail.com", "password": "password789"},
+        9324: {"account": "j4732030@gmail.com", "password": "password012"},
+        9325: {"account": "mahui8875@gmail.com", "password": "password345"},
+    }
+    
+    # 为每个端口创建配置实例
+    configs = {}
+    for port, credentials in port_configs.items():
+        browser_userdata_dir = OUTPUT_DIR / "browser_data" / f"user_data_dir_{port}"
+        configs[port] = AccountInBrowser(
+            browser_address=f"127.0.0.1:{port}",
+            browser_userdata_dir=browser_userdata_dir,
+            account=credentials["account"],
+            password=credentials["password"]
+        )
+    
+    return configs
+
+

+ 0 - 155
src/browser/crawl_amz_search_key.py

@@ -1,155 +0,0 @@
-import asyncio
-import base64
-from datetime import datetime
-import json
-import os
-import re
-import sys
-import time
-import asyncio
-import asyncio
-from pathlib import Path,PurePosixPath
-from typing import List, Optional
-from pydantic import BaseModel, Field
-from sqlmodel import select, Session
-from DrissionPage._pages.chromium_tab import ChromiumTab
-from DrissionPage._units.listener import DataPacket
-from utils.logu import get_logger
-from config.settings import OUTPUT_DIR,TEMP_PAGE_DIR
-from utils.drission_page import load_chrome_from_ini,ChromeOptions
-from utils.file import save_to_file,check_exists,s3,read_file,s3_uri_to_http_url
-from config.settings import CFG
-from src.browser.crawl_base import CrawlerBase
-from src.models.product_model import Product,CompetitorCrawlData,SearchAmazoneKeyResult
-logger = get_logger('browser')
-AMZ_HTML_DIR = OUTPUT_DIR / 'page' / 'amz'
-AMZ_HTML_DIR.mkdir(parents=True, exist_ok=True)
-
-# class SearchAmazoneKeyResult(BaseModel):
-#     suggestions:List[str] = []
-#     search_key:Optional[str] = None
-#     mhtml_path:Optional[str] = None
-#     screenshot:Optional[str] = None
-#     error:Optional[int] = None
-#     msg:Optional[str] = None
-#     created_at:Optional[datetime] = Field(default_factory=datetime.now)
-
-class CrawlerSearchKeyInput(BaseModel):
-    search_key:str
-    mhtml_path:Optional[str] = None
-    screenshot_path:Optional[str] = None
-    overwrite:Optional[bool] = False
-
-
-class CrawlerAmzSearchKey(CrawlerBase):
-    s3_prefix = f"{CFG.s3_prefix}/output/amz/"
-    def __init__(self, chrome_options:ChromeOptions):
-        super().__init__(chrome_options)
-        tabs = self.page.get_tabs(url='amazon')
-        if tabs:
-            tab = tabs[0]
-        else:
-            tab = self.get_or_new_tab()
-        logger.info(f"init tab {tab.url}")
-        self.tab:ChromiumTab = tab
-        
-    def search_key_and_save_page(self, search_key:str, url=''):
-        if 'www.amazon' not in self.tab.url:
-            url = url or 'https://www.amazon.co.jp/'
-            self.tab.get(url)
-            logger.info(f"request get {url}")
-        input_box = self.tab.ele('xpath://input[@id="twotabsearchtextbox"]')
-        input_box.clear()
-        # self.tab.listen.start(method='GET', )
-        input_box.input(search_key)
-        suggestion_ele_list = self.tab.s_ele('xpath://input[@id="sac-autocomplete-results-container"]', timeout=3)
-
-    async def crawl_suggestion(self, search_input:CrawlerSearchKeyInput) -> SearchAmazoneKeyResult:
-        if not check_exists(search_input.mhtml_path) or search_input.overwrite:
-            await asyncio.to_thread(self.search_key_and_save_page, search_input.search_key)
-            save_mhtml_path,temp_mhtml_path = self.save_current_page(self.tab, search_input.mhtml_path, after_unlink=False)
-            if search_input.screenshot_path:
-                screenshot = self.tab.get_screenshot(as_bytes=True)
-                save_to_file(screenshot, str(search_input.screenshot_path))
-            logger.info(f"{search_input.mhtml_path}")
-        else:
-            temp_mhtml_path = self.download_s3(search_input.mhtml_path, temp_dir=TEMP_PAGE_DIR, overwrite=search_input.overwrite)
-            logger.info(f"exists {search_input.mhtml_path}, download {temp_mhtml_path}")
-            
-        self.tab.get(temp_mhtml_path)
-        html_str = self.tab.html
-        logger.info(f"{html_str[:150]}")
-        schema = {
-            "name": "Autocomplete Suggestions",
-            "baseSelector": '//div[@id="sac-autocomplete-results-container"]',
-            "fields": [
-                {
-                    "name": "suggestions",
-                    "type": "list",
-                    "selector": './/div[@role="button"]',
-                    "fields": [
-                        {
-                            "name": "text",
-                            "type": "text",
-                        }
-                    ]
-                }
-            ]
-        }
-        result = await self.excra_strategy_raw_html(html_str, schema=schema)
-        data = json.loads(result.extracted_content)
-        logger.debug(f"{result.extracted_content}")
-        search_key_result = SearchAmazoneKeyResult(search_key=search_input.search_key, mhtml_path=search_input.mhtml_path, screenshot=str(search_input.screenshot_path))
-        suggestions = []
-        if len(data) == 0:
-            msg = f"{search_input.search_key} has no suggestions, temp_mhtml_path {temp_mhtml_path}"
-            logger.error(msg)
-            search_key_result.msg = msg
-            search_key_result.error = 1
-            return search_key_result
-        data = data[0]
-        for item in data['suggestions']:
-            suggestions.append(item['text'])
-        search_key_result.suggestions = suggestions
-        logger.info(f"爬取成功: {search_key_result.model_dump_json(indent=4)}")
-        if temp_mhtml_path:
-            Path(temp_mhtml_path).unlink()
-        return search_key_result
-    def suggestion_listen_package(self):
-        # package = self.tab.listen.wait(1)
-        search_suggestion_package_list:List[DataPacket] = []
-        for package in self.tab.listen.steps(timeout=3):
-            logger.info(f"{package}")
-            re_search = re.search(r'.*amazon\..*/suggestions', package.url)
-            if re_search:
-                search_suggestion_package_list.append(package)
-                logger.info(f"{package.response.body}")
-            if len(search_suggestion_package_list) > 1:
-                break
-        logger.info(f"{len(search_suggestion_package_list)}")
-        # search_suggestion_package = search_suggestion_package_list[1]
-        # logger.info(f"{search_suggestion_package}")
-        self.tab.listen.stop()
-        # logger.info(f"{search_suggestion_package.response.body}")
-async def main():
-    crawler = CrawlerAmzSearchKey(ChromeOptions())
-    search_key = 'コードカバー'
-    # search_key = '1'
-    # save_path = f"{AMZ_HTML_DIR}/{search_key}.mhtml"
-    save_path = crawler.s3_prefix + f"{search_key}.mhtml"
-    search_input = CrawlerSearchKeyInput(
-        search_key=search_key, 
-        mhtml_path=save_path, 
-        screenshot_path=crawler.s3_prefix + f"{search_key}.png",)
-    await crawler.crawl_suggestion(search_input)
-    # crawler.tab.get('https://www.odoo.com/documentation/18.0/administration/upgrade.html')
-    # res = crawler.save_current_page(crawler.tab, f"{AMZ_HTML_DIR}/test.mhtml")
-    # res = crawler.save_current_page(crawler.tab, f"s3://public/amazone/copywriting_production/output/test.mhtml")
-    # logger.info(f"{res}")
-    # crawler.page.get('https://www.odoo.com/documentation/18.0/administration/upgrade.html')
-    # mthml = crawler.page.save(save_path)
-    # save_to_file(mthml, save_path)
-    # await crawler.crawl_suggestion(search_key, save_path=save_path)
-
-if __name__ == "__main__":
-    asyncio.run(main())

+ 21 - 11
src/browser/crawl_asin.py

@@ -25,7 +25,7 @@ from config.settings import OUTPUT_DIR,TEMP_PAGE_DIR
 from utils.drission_page import load_chrome_from_ini,ChromeOptions
 from utils.file import save_to_file,check_exists,s3,read_file
 from config.settings import CFG
-from src.browser.crawl_base import CrawlerBase
+from src.browser.crawl_base import CrawlerBase, AsinCrawlerBase
 from upath import UPath
 
 logger = get_logger('browser')
@@ -33,31 +33,41 @@ ASIN_HTML_DIR = OUTPUT_DIR / 'page' / 'asin'
 ASIN_HTML_DIR.mkdir(parents=True, exist_ok=True)
 
 class Crawler(CrawlerBase):
-    s3_prefix = f"{CFG.s3_prefix}/output/asinseed/"
+    s3_prefix = f"{CFG.s3_prefix}/output/asinseed"
     def __init__(self, chrome_options:ChromeOptions):
         super().__init__(chrome_options)
+
+
+class AsinCrawler(AsinCrawlerBase):
+    s3_prefix = f"{CFG.s3_prefix}/output/asinseed/"
+    def __init__(self, driver):
+        super().__init__(driver)
     
     def get_asin_url(self, asin:str, asin_area:str):
         # https://www.asinseed.com/en/JP?q=B0CQ1SHD8V
         return f"https://www.asinseed.com/en/{asin_area}?q={asin}"
 
     def get_asin_page_data(self, asin:str, asin_area:str, mthml_type:bool=True):
-        page = load_chrome_from_ini(
-            self.chrome_options
-        )
         url = self.get_asin_url(asin, asin_area)
-        page.get(url)
+        self.page.get(url)
         if mthml_type:
-            return page.save()
+            return self.page.save()
         else:
-            return page.html
+            return self.page.html
     def get_asin_and_save_page(self, asin:str, asin_area:str='JP', mthml_type:bool=True, save_path:str=None, overwrite:bool=False):
         if not overwrite and check_exists(save_path):
             logger.info(f"exists {save_path} ")
             return save_path
-        data = self.get_asin_page_data(asin, asin_area, mthml_type)
-        save_path = save_path or str(ASIN_HTML_DIR / f'{asin}{".mhtml" if mthml_type else ".html"}')
-        return save_to_file(data, save_path)
+        
+        if mthml_type:
+            # 使用save_mhtml方法来确保Windows格式兼容
+            save_path = save_path or str(ASIN_HTML_DIR / f'{asin}.mhtml')
+            return self.save_mhtml(save_path)
+        else:
+            # HTML格式保持原有逻辑
+            data = self.get_asin_page_data(asin, asin_area, mthml_type)
+            save_path = save_path or str(ASIN_HTML_DIR / f'{asin}.html')
+            return save_to_file(data, save_path)
     
 
     async def extra_result_table(self, html:str, input_schema:dict={}) -> CrawlResult:

+ 40 - 1
src/browser/crawl_base.py

@@ -10,7 +10,7 @@ import asyncio
 import pickle
 from pathlib import Path,PurePosixPath,PurePath
 import random
-from typing import List
+from typing import List, Optional, Union
 import httpx
 import ssl
 from sqlmodel import select, Session
@@ -27,6 +27,8 @@ from utils.drission_page import load_chrome_from_ini,ChromeOptions
 from utils.file import save_to_file,check_exists,s3,read_file,upload_to_s3,upload_file_to_s3
 from config.settings import CFG
 from abc import ABC, abstractmethod
+import platform
+import re
 logger = get_logger('browser')
 
 
@@ -60,6 +62,43 @@ class AbstractCrawlerBase(ABC):
                 temp_mhtml_path = None
         return save_path,temp_mhtml_path
 
+    def save_mhtml(self, path: Optional[Union[str, Path]] = None, name: Optional[str] = None, tab=None) -> Union[str, Path]:
+        """保存页面为MHTML格式
+        
+        Args:
+            path: 保存路径,如果为None则返回MHTML数据
+            name: 文件名,如果path为目录时使用
+            tab: 要保存的标签页,默认为当前页面
+            
+        Returns:
+            如果path为None,返回MHTML数据字符串
+            如果指定了path,返回保存的文件路径
+        """
+        if not tab:
+            tab = self.page
+            
+        # 获取MHTML数据
+        mhtml_data = tab._run_cdp('Page.captureSnapshot')['data']
+        
+        # 如果指定了保存路径,则保存文件
+        if path is not None:
+            if isinstance(path, str) and os.path.isdir(path):
+                file_name = name or 'page.mhtml'
+                file_path = os.path.join(path, file_name)
+            else:
+                file_path = str(path)
+            
+            os.makedirs(os.path.dirname(file_path), exist_ok=True)
+            
+            # 直接以二进制模式保存,保持原始换行符格式
+            with open(file_path, 'wb') as f:
+                f.write(mhtml_data.encode('utf-8'))
+            
+            logger.info(f"MHTML文件已保存: {file_path}")
+            return file_path
+        else:
+            return mhtml_data
+
     def download_s3(self, s3_uri:str, temp_dir:str, overwrite:str=False):
         if not s3_uri.startswith('s3://'):
             raise ValueError("Invalid S3 URI. Must start with 's3://'")

+ 1 - 6
src/flow_task/__init__.py

@@ -3,13 +3,11 @@ Flow Task Package
 使用Prefect框架的工作流程任务包
 """
 
-from .crawl_asin_flow import (
+from .crawl_asin import (
     CrawlAsinInput,
     AsinAreaEnum,
     CrawlAsinFlow,
     crawl_asin_flow,
-    crawl_asin_task,
-    _get_and_save_page_data_task,
 )
 
 __all__ = [
@@ -17,7 +15,4 @@ __all__ = [
     "AsinAreaEnum", 
     "CrawlAsinFlow",
     "crawl_asin_flow",
-    "crawl_asin_task",
-    "_get_and_save_page_data_task",
-    "create_crawl_asin_flows",
 ]

+ 232 - 0
src/flow_task/crawl_asin.py

@@ -0,0 +1,232 @@
+from datetime import datetime, timedelta
+from enum import StrEnum
+from pathlib import Path
+from typing import Optional, Any, Union, List
+from pydantic import BaseModel, Field
+import re
+from prefect import flow, task,get_run_logger
+from prefect.states import Failed, Running, Completed
+from prefect.cache_policies import INPUTS
+from prefect.futures import wait
+from src.browser.crawl_asin import Crawler,AsinCrawler
+from utils.drission_page import ChromeOptions, ChromiumOptions
+from config.settings import CFG, read_config, get_config_path, TEMP_PAGE_DIR, OPENAI_API_KEY, OPENAI_API_BASE
+from utils.logu import get_logger
+from utils.file import save_to_file, check_exists, extract_excel_text_from_url, s3_uri_to_http_url
+from utils.file import s3
+from utils.url_utils import extract_urls_from_text, extract_filename_from_url
+from llama_index.llms.litellm import LiteLLM
+from llama_index.core.program import LLMTextCompletionProgram
+from llama_index.core.output_parsers import PydanticOutputParser
+from llama_index.core.output_parsers.pydantic import extract_json_str
+from src.flow_task.db.product_import_db import product_import_manager
+from src.flow_task.db.models.product_models import ProductImport, ProductForExtraction
+from src.manager.core.db import DbManager, AsinSeed
+from markitdown import MarkItDown
+import tempfile
+import os
+
+logger = get_logger('flow_task')
+
+
+from src.browser.browser_config import AccountInBrowser
+
+class AsinAreaEnum(StrEnum):
+    """ASIN地区枚举"""
+    US = "US"
+    JP = "JP"
+    UK = "UK"
+    DE = "DE"
+    FR = "FR"
+    CA = "CA"
+    AU = "AU"
+class CrawlAsinInput(BaseModel):
+    """爬取ASIN页面输入模型"""
+    asin: str = Field(description="ASIN编码")
+    asin_area: Optional[AsinAreaEnum] = Field(default=AsinAreaEnum.JP, description="ASIN地区")
+    mthml_type: Optional[bool] = Field(default=True, description="是否保存为MHTML格式")
+    save_path: Optional[str] = Field(default=None, description="保存路径")
+    overwrite: Optional[bool] = Field(default=False, description="是否覆盖已存在文件")
+    browser: AccountInBrowser = Field(description="浏览器账号信息")
+
+
+class BaseCrawlFlow:
+    """基础爬取流程类"""
+    FLOW_NAME = "基础爬取流程"
+    
+    def __init__(self, flow_input: CrawlAsinInput):
+        self.flow_input = flow_input
+        self.chrome_options = self._init_chrome_options()
+        self.crawler = AsinCrawler.create_browser(
+            address=flow_input.browser.browser_address,
+            user_data_dir=flow_input.browser.browser_userdata_dir,
+            )
+        self.run_log = get_run_logger()
+
+    def _init_chrome_options(self) -> ChromeOptions:
+        """初始化Chrome选项"""
+        chrome_options = ChromiumOptions(read_file=False)
+        # 从flow_input中获取浏览器配置
+        address = self.flow_input.browser.browser_address or '127.0.0.1:16800'
+        user_data_dir = self.flow_input.browser.browser_userdata_dir
+        # 务必不能小于10000,否则可能由于环境问题导致错误
+        chrome_options.set_address(address)
+        if user_data_dir:
+            chrome_options.set_user_data_path(user_data_dir)
+        return chrome_options
+    
+    def _get_save_path(self) -> str:
+        """获取保存路径"""
+        if self.flow_input.save_path:
+            return self.flow_input.save_path
+        
+        # 默认保存路径
+        extension = ".mhtml" if self.flow_input.mthml_type else ".html"
+        return f"{Crawler.s3_prefix}{self.flow_input.asin}/{self.flow_input.asin}{extension}"
+    
+
+
+class CrawlAsinFlow(BaseCrawlFlow):
+    """ASIN页面爬取流程"""
+    FLOW_NAME = "ASIN页面爬取流程"
+
+    @task(
+        task_run_name="保存整个html页面",
+        persist_result=True,
+        cache_expiration=timedelta(days=31),
+        cache_policy=INPUTS-'self' - 'overwrite',
+    )
+    def task_save_page(self, asin: str, asin_area: AsinAreaEnum, mthml_type: bool):
+        """获取页面数据并保存到本地temp目录的task方法"""
+        self.run_log.info(f"开始获取页面数据: {asin} {asin_area} {mthml_type}")
+        
+        # 获取页面数据
+        # data = self.crawler.get_asin_page_data(
+        #     asin=asin,
+        #     asin_area=asin_area,
+        #     mthml_type=mthml_type
+        # )
+        self.crawler.page.get('https://docs.llamaindex.ai/en/stable/examples/output_parsing/llm_program/#define-a-custom-output-parser')
+        # 生成本地temp保存路径
+        local_dir = TEMP_PAGE_DIR / "asinseed"
+        local_dir.mkdir(parents=True, exist_ok=True)
+        
+        extension = ".mhtml" if mthml_type else ".html"
+        local_path = local_dir / f"{asin}{extension}"
+        data = self.crawler.save_mhtml(local_path)
+        # logger.info(f"data {data}")
+        # save_to_file(data, local_path)
+        # with open(local_path, "w", encoding='utf-8') as f:
+        #     self.run_log.info(f"正在写入数据到本地文件: {local_path}, len: {len(data)}")
+        #     f.write(data)
+        self.run_log.info(f"成功保存到本地temp目录: {local_path}")
+        return str(local_path)
+
+
+    @task(
+        task_run_name="保存到数据库和对象存储",
+        persist_result=True,
+        cache_expiration=timedelta(hours=12),
+        cache_policy=INPUTS-'self',
+    )
+    def task_save_to_db(self, local_file_path: str, asin: str, mthml_type: bool, asin_area: str = 'JP'):
+        """将temp目录文件上传到S3的task方法,先检查数据库是否存在记录"""
+        self.run_log.info(f"开始处理文件: {local_file_path}")
+        
+        # 初始化数据库管理器
+        db_manager = DbManager()
+        
+        # 检查数据库中是否已存在该ASIN的记录
+        existing_record = db_manager.get_asin_seed(asin)
+        
+        
+        # 生成S3保存路径
+        s3_path = f"{Crawler.s3_prefix}/{asin}{'.mhtml' if mthml_type else '.html'}"
+            
+        try:
+            # 读取本地文件
+            with open(local_file_path, 'rb') as f:
+                data = f.read()
+            
+            self.run_log.info(f"上传到S3: {s3_path}")
+            final_path = save_to_file(data, s3_path)
+            self.run_log.info(f"成功上传到S3: {final_path}")
+            
+            # 将S3 URI转换为HTTP URL
+            http_url = s3_uri_to_http_url(final_path)
+            self.run_log.info(f"转换为HTTP URL: {http_url}")
+            
+            # 保存到数据库
+            if existing_record:
+                # 更新现有记录
+                existing_record.mhtml_path = final_path
+                existing_record.asin_area = asin_area
+                db_manager.save_asin_seed(existing_record)
+                self.run_log.info(f"更新数据库记录: ASIN {asin}")
+            else:
+                # 创建新记录
+                new_record = AsinSeed(asin=asin, asin_area=asin_area, mhtml_path=final_path)
+                db_manager.save_asin_seed(new_record)
+                self.run_log.info(f"创建数据库记录: ASIN {asin}")
+            
+            return http_url
+        except Exception as s3_error:
+            self.run_log.error(f"S3上传失败: {s3_error}")
+            raise Exception(f"S3上传失败: {s3_error}")
+
+    def _get_and_save_page_data(self):
+        """获取页面数据并保存的方法"""
+        # 第一步:获取数据并保存到本地temp目录
+        local_file_path = self.task_save_page.with_options(refresh_cache=True)(
+            asin=self.flow_input.asin,
+            asin_area=self.flow_input.asin_area,
+            mthml_type=self.flow_input.mthml_type,
+        )
+        
+        # 第二步:将temp目录文件上传到S3并保存到数据库
+        s3_path = self.task_save_to_db.with_options(refresh_cache=True)(
+            local_file_path=local_file_path,
+            asin=self.flow_input.asin,
+            mthml_type=self.flow_input.mthml_type,
+            asin_area=self.flow_input.asin_area
+        )
+        
+        return s3_path
+    
+    def run(self):
+        """执行流程"""
+        self.run_log.info(f"开始执行流程: {self.FLOW_NAME}")
+        self.run_log.info(f"ASIN: {self.flow_input.asin}")
+        self.run_log.info(f"地区: {self.flow_input.asin_area}")
+        self.run_log.info(f"MHTML格式: {self.flow_input.mthml_type}")
+        self.run_log.info(f"覆盖模式: {self.flow_input.overwrite}")
+        
+        try:
+            # 使用task方法获取和保存页面数据
+            final_path = self._get_and_save_page_data()
+            
+            self.run_log.info(f"流程执行成功,保存路径: {final_path}")
+            return {
+                'status': 'success',
+                'path': final_path,
+                'asin': self.flow_input.asin,
+                'asin_area': self.flow_input.asin_area
+            }
+            
+        except Exception as e:
+            self.run_log.error(f"流程执行失败: {e}")
+            raise e
+
+@flow(
+    name=CrawlAsinFlow.FLOW_NAME,
+    persist_result=True,
+    result_serializer="json",
+)
+def crawl_asin_flow(flow_input: CrawlAsinInput):
+    """ASIN页面爬取Prefect流程"""
+    self = CrawlAsinFlow(flow_input)
+    self.run_log.info(f"启动ASIN爬取流程: {flow_input.asin}")
+    result = self.run()
+    
+    return result
+

+ 0 - 494
src/flow_task/crawl_asin_flow.py

@@ -1,494 +0,0 @@
-from datetime import datetime, timedelta
-from enum import StrEnum
-from pathlib import Path
-from typing import Optional, Any, Union, List
-from pydantic import BaseModel, Field
-import re
-from prefect import flow, task
-from prefect.states import Failed, Running, Completed
-from prefect.cache_policies import INPUTS
-from prefect.futures import wait
-from src.browser.crawl_asin import Crawler
-from utils.drission_page import ChromeOptions
-from config.settings import CFG, read_config, get_config_path, TEMP_PAGE_DIR, OPENAI_API_KEY, OPENAI_API_BASE
-from utils.logu import get_logger
-from utils.file import save_to_file, check_exists, extract_excel_text_from_url
-from utils.file import s3
-from utils.url_utils import extract_urls_from_text, extract_filename_from_url
-from llama_index.llms.litellm import LiteLLM
-from llama_index.core.program import LLMTextCompletionProgram
-from llama_index.core.output_parsers import PydanticOutputParser
-from llama_index.core.output_parsers.pydantic import extract_json_str
-from src.flow_task.db.product_import_db import product_import_manager
-from src.flow_task.db.models.product_models import ProductImport, ProductForExtraction
-from src.manager.core.db import DbManager, AsinSeed
-from markitdown import MarkItDown
-import tempfile
-import os
-
-logger = get_logger('flow_task')
-
-
-class AsinAreaEnum(StrEnum):
-    """ASIN地区枚举"""
-    US = "US"
-    JP = "JP"
-    UK = "UK"
-    DE = "DE"
-    FR = "FR"
-    CA = "CA"
-    AU = "AU"
-
-
-class BaseCommon(BaseModel):
-    """基础通用输入模型"""
-    browser_address: Optional[str] = Field(default='127.0.0.1:16800', description="浏览器调试地址")
-    browser_userdata_dir: Optional[str | Path] = Field(default=None, description="浏览器用户数据目录")
-    chrome_config_ini: Optional[str] = Field(default=None, description="Chrome配置INI文件路径")
-
-
-class CrawlAsinInput(BaseCommon):
-    """爬取ASIN页面输入模型"""
-    asin: str = Field(description="ASIN编码")
-    asin_area: AsinAreaEnum = Field(default=AsinAreaEnum.JP, description="ASIN地区")
-    mthml_type: bool = Field(default=True, description="是否保存为MHTML格式")
-    save_path: Optional[str] = Field(default=None, description="保存路径")
-    overwrite: bool = Field(default=False, description="是否覆盖已存在文件")
-
-
-class BaseCrawlFlow:
-    """基础爬取流程类"""
-    FLOW_NAME = "基础爬取流程"
-    
-    def __init__(self, flow_input: CrawlAsinInput):
-        self.flow_input = flow_input
-        self.chrome_options = self._init_chrome_options()
-        self.crawler = Crawler(chrome_options=self.chrome_options)
-    
-    def _init_chrome_options(self) -> ChromeOptions:
-        """初始化Chrome选项"""
-        chrome_config_ini = self.flow_input.chrome_config_ini or CFG.chrome_config_ini
-        return ChromeOptions(ini_path=chrome_config_ini)
-    
-    def _get_save_path(self) -> str:
-        """获取保存路径"""
-        if self.flow_input.save_path:
-            return self.flow_input.save_path
-        
-        # 默认保存路径
-        extension = ".mhtml" if self.flow_input.mthml_type else ".html"
-        return f"{Crawler.s3_prefix}{self.flow_input.asin}/{self.flow_input.asin}{extension}"
-    
-    def _get_and_save_page_data(self):
-        """获取页面数据并保存的方法"""
-        return _get_and_save_page_data_task(
-            crawler=self.crawler,
-            asin=self.flow_input.asin,
-            asin_area=self.flow_input.asin_area,
-            mthml_type=self.flow_input.mthml_type,
-            save_path=self._get_save_path(),
-            overwrite=self.flow_input.overwrite
-        )
-    
-    def run(self):
-        """执行流程"""
-        logger.info(f"开始执行流程: {self.FLOW_NAME}")
-        logger.info(f"ASIN: {self.flow_input.asin}")
-        logger.info(f"地区: {self.flow_input.asin_area}")
-        logger.info(f"MHTML格式: {self.flow_input.mthml_type}")
-        logger.info(f"覆盖模式: {self.flow_input.overwrite}")
-        
-        try:
-            # 使用task方法获取和保存页面数据
-            final_path = self._get_and_save_page_data()
-            
-            logger.info(f"流程执行成功,保存路径: {final_path}")
-            return {
-                'status': 'success',
-                'path': final_path,
-                'asin': self.flow_input.asin,
-                'asin_area': self.flow_input.asin_area
-            }
-            
-        except Exception as e:
-            logger.error(f"流程执行失败: {e}")
-            raise e
-
-
-class CrawlAsinFlow(BaseCrawlFlow):
-    """ASIN页面爬取流程"""
-    FLOW_NAME = "ASIN页面爬取流程"
-
-
-@flow(
-    name=CrawlAsinFlow.FLOW_NAME,
-    persist_result=True,
-    result_serializer="json",
-)
-def crawl_asin_flow(flow_input: CrawlAsinInput):
-    """ASIN页面爬取Prefect流程"""
-    logger.info(f"启动ASIN爬取流程: {flow_input.asin}")
-    
-    flow_runner = CrawlAsinFlow(flow_input)
-    result = flow_runner.run()
-    
-    return result
-
-
-@task(name="保存整个html页面",
-    persist_result=True,
-    cache_expiration=timedelta(days=31),
-    cache_policy=INPUTS- 'crawler' - 'overwrite'
-)
-def task_save_page(crawler: Crawler, asin: str, asin_area: AsinAreaEnum, 
-                                        mthml_type: bool, overwrite: bool):
-    """获取页面数据并保存到本地temp目录的task方法"""
-    logger.info(f"开始获取页面数据: {asin}")
-    
-    # 获取页面数据
-    data = crawler.get_asin_page_data(
-        asin=asin,
-        asin_area=asin_area,
-        mthml_type=mthml_type
-    )
-    
-    # 生成本地temp保存路径
-    local_dir = TEMP_PAGE_DIR / "asin" / asin
-    local_dir.mkdir(parents=True, exist_ok=True)
-    
-    extension = ".mhtml" if mthml_type else ".html"
-    local_path = local_dir / f"{asin}{extension}"
-    
-    # 检查文件是否已存在
-    if not overwrite and local_path.exists():
-        logger.info(f"文件已存在,跳过保存: {local_path}")
-        return str(local_path)
-    
-    try:
-        logger.info(f"保存到本地temp目录: {local_path}")
-        local_final_path = save_to_file(data, local_path)
-        logger.info(f"成功保存到本地temp目录: {local_final_path}")
-        return str(local_final_path)
-    except Exception as local_error:
-        logger.error(f"本地temp目录保存失败: {local_error}")
-        raise Exception(f"本地temp目录保存失败: {local_error}")
-
-
-@task(name="保存到数据库和对象存储",
-    persist_result=True,
-    cache_expiration=timedelta(days=31),
-)
-def task_save_to_db(local_file_path: str, asin: str, mthml_type: bool, asin_area: str = 'JP'):
-    """将temp目录文件上传到S3的task方法,先检查数据库是否存在记录"""
-    logger.info(f"开始处理文件: {local_file_path}")
-    
-    # 初始化数据库管理器
-    db_manager = DbManager()
-    
-    # 检查数据库中是否已存在该ASIN的记录
-    existing_record = db_manager.get_asin_seed(asin)
-    
-    # 如果存在记录且mhtml_path不为空,直接返回该路径
-    if existing_record and existing_record.mhtml_path:
-        logger.info(f"数据库中已存在ASIN {asin} 的记录,路径: {existing_record.mhtml_path}")
-        return existing_record.mhtml_path
-    
-    # 生成S3保存路径
-    s3_path = f"{Crawler.s3_prefix}{asin}/{asin}{'.mhtml' if mthml_type else '.html'}"
-        
-    try:
-        # 读取本地文件
-        with open(local_file_path, 'rb') as f:
-            data = f.read()
-        
-        logger.info(f"上传到S3: {s3_path}")
-        final_path = save_to_file(data, s3_path)
-        logger.info(f"成功上传到S3: {final_path}")
-        
-        # 保存到数据库
-        if existing_record:
-            # 更新现有记录
-            existing_record.mhtml_path = final_path
-            existing_record.asin_area = asin_area
-            db_manager.save_asin_seed(existing_record)
-            logger.info(f"更新数据库记录: ASIN {asin}")
-        else:
-            # 创建新记录
-            new_record = AsinSeed(asin=asin, asin_area=asin_area, mhtml_path=final_path)
-            db_manager.save_asin_seed(new_record)
-            logger.info(f"创建数据库记录: ASIN {asin}")
-        
-        return final_path
-    except Exception as s3_error:
-        logger.error(f"S3上传失败: {s3_error}")
-        raise Exception(f"S3上传失败: {s3_error}")
-
-
-@task
-def _get_and_save_page_data_task(crawler: Crawler, asin: str, asin_area: AsinAreaEnum, 
-                                mthml_type: bool, overwrite: bool, save_path: str = None):
-    """获取页面数据并保存的独立task方法(兼容旧版本)"""
-    logger.info(f"开始获取页面数据并保存: {asin}")
-    
-    # 第一步:获取数据并保存到本地temp目录
-    try:
-        local_file_path = task_save_page(
-            crawler=crawler,
-            asin=asin,
-            asin_area=asin_area,
-            mthml_type=mthml_type,
-            overwrite=overwrite
-        )
-        logger.info(f"成功保存到本地temp目录: {local_file_path}")
-    except Exception as local_error:
-        logger.error(f"本地temp目录保存失败: {local_error}")
-        raise Exception(f"本地temp目录保存失败: {local_error}")
-    
-    # 第二步:将temp目录文件上传到S3并保存到数据库
-    try:
-        s3_path = task_save_to_db(
-            local_file_path=local_file_path,
-            asin=asin,
-            mthml_type=mthml_type,
-            asin_area=asin_area
-        )
-        logger.info(f"成功上传到S3并保存到数据库: {s3_path}")
-        return s3_path
-    except Exception as s3_error:
-        logger.error(f"S3上传失败,但本地文件已保存: {s3_error}")
-        raise Exception(f"S3上传失败,但本地文件已保存: {s3_error}")
-
-
-
-
-
-@task
-def crawl_asin_task(flow_input: CrawlAsinInput):
-    """ASIN页面爬取任务"""
-    return crawl_asin_flow(flow_input)
-
-
-@task(name="解析URL表格为Markdown",
-    persist_result=True,
-    cache_expiration=timedelta(days=31),
-    cache_policy=INPUTS
-)
-def parse_url_to_markdown_task(url: str):
-    """将URL表格文件转换为Markdown格式的task方法
-    支持Excel文件和其他文件格式,Excel文件使用pandas读取所有工作表信息
-    
-    Args:
-        url (str): 表格文件的URL或本地路径
-    
-    Returns:
-        str: 解析后的Markdown格式内容
-    """
-    logger.info(f"开始解析URL表格文件: {url}")
-    
-    try:
-        # 检查文件类型,如果是Excel文件则使用pandas方法
-        if url.lower().endswith(('.xlsx', '.xls')):
-            logger.info(f"检测到Excel文件,使用pandas方法读取: {url}")
-            
-            # 使用pandas方法读取Excel文件
-            all_cells_text_dict = extract_excel_text_from_url(url)
-            
-            if not all_cells_text_dict:
-                logger.warning(f"Excel文件读取失败或为空: {url}")
-                return ""
-            
-            # 将Excel内容转换为Markdown格式
-            markdown_content = ""
-            for sheet_name, sheet_content in all_cells_text_dict.items():
-                markdown_content += f"## 工作表: {sheet_name}\n\n"
-                markdown_content += "```\n"
-                markdown_content += sheet_content
-                markdown_content += "\n```\n\n"
-            
-            logger.info(f"成功解析Excel文件,共读取 {len(all_cells_text_dict)} 个工作表: {url}")
-            return markdown_content
-        
-        else:
-            # 非Excel文件使用原来的markitdown方法
-            logger.info(f"检测到非Excel文件,使用markitdown方法读取: {url}")
-            
-            # 创建MarkItDown实例
-            md = MarkItDown(enable_plugins=False)
-            
-            # 转换文档
-            result = md.convert(url)
-            
-            # 获取Markdown格式内容
-            markdown_content = result.text_content
-            
-            logger.info(f"成功解析URL表格文件: {url}")
-            return markdown_content
-        
-    except Exception as e:
-        logger.error(f"解析URL表格文件时发生错误: {e}")
-        raise Exception(f"解析URL表格文件失败: {e}")
-
-
-class DebugPydanticOutputParser(PydanticOutputParser):
-    """继承自PydanticOutputParser的调试版本,打印LLM生成结果"""
-    
-    def parse(self, text: str) -> Any:
-        """Parse, validate, and correct errors programmatically."""
-        logger.info("=== LLM生成结果 ===")
-        logger.info(text)
-        logger.info("=== LLM生成结果结束 ===")
-        
-        # 清理markdown代码块格式
-        cleaned_text = text
-        if "```json" in text:
-            # 移除markdown代码块标记
-            cleaned_text = text.split("```json")[1].split("```")[0]
-        elif "```" in text:
-            # 移除通用markdown代码块标记
-            cleaned_text = text.split("```")[1].split("```")[0]
-        
-        # 清理转义字符
-        cleaned_text = cleaned_text.replace("\\n", "\n").replace("\\\"", "\"")
-        
-        json_str = extract_json_str(cleaned_text)
-        return self._output_cls.model_validate_json(json_str)
-
-
-def extract_product_from_text(text: str, uri: str = "", filename: str = "") -> ProductImport:
-    """使用LLMTextCompletionProgram从文本中提取产品信息"""
-    llm = LiteLLM(model='openai/GLM-4-Flash', api_key=OPENAI_API_KEY, api_base=OPENAI_API_BASE)
-    
-    # 使用自定义的DebugPydanticOutputParser
-    output_parser = DebugPydanticOutputParser(output_cls=ProductForExtraction)
-    
-    program = LLMTextCompletionProgram.from_defaults(
-        prompt_template_str=f"请从以下文本中提取产品信息:\n\nurl: {uri} \n\n{{text}}",
-        llm=llm,
-        verbose=True,
-        output_parser=output_parser
-    )
-    
-    extracted_product = program(text=text)
-    
-    # 使用类方法创建Product实例
-    return ProductImport.from_product_extraction(
-        extracted_product=extracted_product,
-        markdown_content=text,
-        uri=uri,
-        filename=filename
-    )
-
-
-
-
-
-@task(name="Excel处理",
-    persist_result=True,
-    cache_expiration=timedelta(days=31),
-    cache_policy=INPUTS
-)
-def get_or_create_product_import_by_url(file_url: str):
-    """根据文件URL获取数据库中的ProductImport记录,如果不存在则解析Excel并保存到数据库
-    
-    Args:
-        file_url (str): 文件的URL或本地路径
-    
-    Returns:
-        ProductImport: 数据库中的ProductImport记录
-    """
-    # 从URL中提取文件名
-    file_name = extract_filename_from_url(file_url)
-    
-    logger.info(f"开始处理文件: {file_name} (URL: {file_url})")
-    
-    # 首先检查数据库中是否已存在该文件名的记录
-    existing_record = product_import_manager.get_product_import_by_filename(file_name)
-    
-    if existing_record:
-        logger.info(f"数据库中已存在文件 {file_name} 的记录,直接返回")
-        return existing_record
-    
-    logger.info(f"数据库中不存在文件 {file_name} 的记录,开始解析Excel并保存到数据库")
-    
-    try:
-        # 解析Excel文件为Markdown格式
-        markdown_content = parse_url_to_markdown_task(file_url)
-        
-        if not markdown_content:
-            logger.warning(f"Excel文件解析失败或为空: {file_url}")
-            raise Exception(f"Excel文件解析失败或为空: {file_url}")
-        
-        # 使用LLM从Markdown内容中提取产品信息
-        product_import = extract_product_from_text(
-            text=markdown_content,
-            uri=file_url,
-            filename=file_name
-        )
-        
-        # 保存到数据库
-        saved_record = product_import_manager.save_product_import(product_import)
-        
-        logger.info(f"成功解析Excel并保存到数据库: {file_name}")
-        return saved_record
-        
-    except Exception as e:
-        logger.error(f"处理文件 {file_name} 时发生错误: {e}")
-        raise Exception(f"处理文件失败: {e}")
-
-
-class ProductImportInput(BaseModel):
-    """产品导入输入模型"""
-    file_url: Union[str, List[str]] = Field(description="文件的URL或本地路径,可以是字符串或列表")
-
-
-
-
-
-@flow(
-    name="产品导入流程",
-    persist_result=True,
-    result_serializer="json",
-)
-def product_import_flow(flow_input: ProductImportInput):
-    """产品导入Prefect流程,支持字符串或列表输入,并发执行解析"""
-    # 处理输入,统一转换为URL列表
-    if isinstance(flow_input.file_url, str):
-        logger.info(f"输入为字符串,尝试提取URL: {flow_input.file_url}")
-        # 如果是字符串,尝试提取URL
-        urls = extract_urls_from_text(flow_input.file_url)
-        if not urls:
-            # 如果没有提取到URL,假设整个字符串就是一个URL
-            urls = [flow_input.file_url]
-        logger.info(f"提取到 {len(urls)} 个URL: {urls}")
-    else:
-        # 如果是列表,直接使用
-        urls = flow_input.file_url
-        logger.info(f"输入为列表,共 {len(urls)} 个URL: {urls}")
-    
-    # 并发执行所有URL的解析
-    all_futures = []
-    for url in urls:
-        future = get_or_create_product_import_by_url.with_options(
-            task_run_name=f"处理URL: {url}",
-        ).submit(url)
-        all_futures.append(future)
-    
-    # 等待所有任务完成
-    logger.info(f"等待 {len(all_futures)} 个任务完成...")
-    results = [future.result() for future in wait(all_futures).done]
-    
-    logger.info(f"所有任务完成,成功处理 {len(results)} 个文件")
-    
-    return {
-        'status': 'success',
-        'product_imports': results,
-        'file_urls': urls,
-        'total_count': len(results)
-    }
-
-
-@task
-def product_import_task(flow_input: ProductImportInput):
-    """产品导入任务"""
-    return product_import_flow(flow_input)
-

+ 32 - 0
src/flow_task/db/product_import_db.py

@@ -73,6 +73,38 @@ class ProductImportManager:
             else:
                 return list_model
     
+    def get_product_imports_by_month(self, month: Optional[int] = None, year: Optional[int] = None, to_dict: bool = False) -> List[ProductImport]:
+        """根据月份获取ProductImport记录
+        
+        Args:
+            month: 月份 (1-12),如果未提供则使用当前月份
+            year: 年份,如果未提供则使用当前年份
+            to_dict: 是否返回字典格式,默认为False
+            
+        Returns:
+            List[ProductImport]: 指定月份的ProductImport记录列表
+        """
+        now = datetime.now()
+        target_month = month if month is not None else now.month
+        target_year = year if year is not None else now.year
+        
+        with Session(self.engine) as session:
+            # 构建查询条件:筛选指定年份和月份的记录
+            statement = select(ProductImport).where(
+                ProductImport.updated_at.isnot(None),
+                ProductImport.updated_at.between(
+                    datetime(target_year, target_month, 1, 0, 0, 0),
+                    datetime(target_year, target_month + 1, 1, 0, 0, 0) if target_month < 12 else datetime(target_year + 1, 1, 1, 0, 0, 0)
+                )
+            )
+            results = session.exec(statement)
+            list_model = results.all()
+            
+            if to_dict:
+                return [model.model_dump() for model in list_model]
+            else:
+                return list_model
+    
     def add_or_ignore_product_import(self, product_import: ProductImport) -> ProductImport:
         """添加或忽略已存在的ProductImport(根据URI)"""
         exist = self.get_product_import_by_uri(product_import.uri)

+ 242 - 0
src/flow_task/extra_excel_product_flow.py

@@ -0,0 +1,242 @@
+from datetime import datetime, timedelta
+from enum import StrEnum
+from pathlib import Path
+from typing import Optional, Any, Union, List
+from pydantic import BaseModel, Field
+import re
+from prefect import flow, task
+from prefect.states import Failed, Running, Completed
+from prefect.cache_policies import INPUTS
+from prefect.futures import wait
+from utils.logu import get_logger
+from utils.file import extract_excel_text_from_url
+from utils.url_utils import extract_urls_from_text, extract_filename_from_url
+from llama_index.llms.litellm import LiteLLM
+from llama_index.core.program import LLMTextCompletionProgram
+from llama_index.core.output_parsers import PydanticOutputParser
+from llama_index.core.output_parsers.pydantic import extract_json_str
+from src.flow_task.db.product_import_db import product_import_manager
+from src.flow_task.db.models.product_models import ProductImport, ProductForExtraction
+from markitdown import MarkItDown
+
+logger = get_logger('flow_task')
+
+
+class DebugPydanticOutputParser(PydanticOutputParser):
+    """继承自PydanticOutputParser的调试版本,打印LLM生成结果"""
+    
+    def parse(self, text: str) -> Any:
+        """Parse, validate, and correct errors programmatically."""
+        logger.info("=== LLM生成结果 ===")
+        logger.info(text)
+        logger.info("=== LLM生成结果结束 ===")
+        
+        # 清理markdown代码块格式
+        cleaned_text = text
+        if "```json" in text:
+            # 移除markdown代码块标记
+            cleaned_text = text.split("```json")[1].split("```")[0]
+        elif "```" in text:
+            # 移除通用markdown代码块标记
+            cleaned_text = text.split("```")[1].split("```")[0]
+        
+        # 清理转义字符
+        cleaned_text = cleaned_text.replace("\\n", "\n").replace("\\\"", "\"")
+        
+        json_str = extract_json_str(cleaned_text)
+        return self._output_cls.model_validate_json(json_str)
+
+
+def extract_product_from_text(text: str, uri: str = "", filename: str = "") -> ProductImport:
+    """使用LLMTextCompletionProgram从文本中提取产品信息"""
+    llm = LiteLLM(model='openai/GLM-4-Flash', api_key=OPENAI_API_KEY, api_base=OPENAI_API_BASE)
+    
+    # 使用自定义的DebugPydanticOutputParser
+    output_parser = DebugPydanticOutputParser(output_cls=ProductForExtraction)
+    
+    program = LLMTextCompletionProgram.from_defaults(
+        prompt_template_str=f"请从以下文本中提取产品信息:\n\nurl: {uri} \n\n{{text}}",
+        llm=llm,
+        verbose=True,
+        output_parser=output_parser
+    )
+    
+    extracted_product = program(text=text)
+    
+    # 使用类方法创建Product实例
+    return ProductImport.from_product_extraction(
+        extracted_product=extracted_product,
+        markdown_content=text,
+        uri=uri,
+        filename=filename
+    )
+
+
+@task(name="解析URL表格为Markdown",
+    persist_result=True,
+    cache_expiration=timedelta(days=31),
+    cache_policy=INPUTS
+)
+def parse_url_to_markdown_task(url: str):
+    """将URL表格文件转换为Markdown格式的task方法
+    支持Excel文件和其他文件格式,Excel文件使用pandas读取所有工作表信息
+    
+    Args:
+        url (str): 表格文件的URL或本地路径
+    
+    Returns:
+        str: 解析后的Markdown格式内容
+    """
+    logger.info(f"开始解析URL表格文件: {url}")
+    
+    try:
+        # 检查文件类型,如果是Excel文件则使用pandas方法
+        if url.lower().endswith(('.xlsx', '.xls')):
+            logger.info(f"检测到Excel文件,使用pandas方法读取: {url}")
+            
+            # 使用pandas方法读取Excel文件
+            all_cells_text_dict = extract_excel_text_from_url(url)
+            
+            if not all_cells_text_dict:
+                logger.warning(f"Excel文件读取失败或为空: {url}")
+                return ""
+            
+            # 将Excel内容转换为Markdown格式
+            markdown_content = ""
+            for sheet_name, sheet_content in all_cells_text_dict.items():
+                markdown_content += f"## 工作表: {sheet_name}\n\n"
+                markdown_content += "```\n"
+                markdown_content += sheet_content
+                markdown_content += "\n```\n\n"
+            
+            logger.info(f"成功解析Excel文件,共读取 {len(all_cells_text_dict)} 个工作表: {url}")
+            return markdown_content
+        
+        else:
+            # 非Excel文件使用原来的markitdown方法
+            logger.info(f"检测到非Excel文件,使用markitdown方法读取: {url}")
+            
+            # 创建MarkItDown实例
+            md = MarkItDown(enable_plugins=False)
+            
+            # 转换文档
+            result = md.convert(url)
+            
+            # 获取Markdown格式内容
+            markdown_content = result.text_content
+            
+            logger.info(f"成功解析URL表格文件: {url}")
+            return markdown_content
+        
+    except Exception as e:
+        logger.error(f"解析URL表格文件时发生错误: {e}")
+        raise Exception(f"解析URL表格文件失败: {e}")
+
+
+@task(name="Excel处理",
+    persist_result=True,
+    cache_expiration=timedelta(days=31),
+    cache_policy=INPUTS
+)
+def get_or_create_product_import_by_url(file_url: str):
+    """根据文件URL获取数据库中的ProductImport记录,如果不存在则解析Excel并保存到数据库
+    
+    Args:
+        file_url (str): 文件的URL或本地路径
+    
+    Returns:
+        ProductImport: 数据库中的ProductImport记录
+    """
+    # 从URL中提取文件名
+    file_name = extract_filename_from_url(file_url)
+    
+    logger.info(f"开始处理文件: {file_name} (URL: {file_url})")
+    
+    # 首先检查数据库中是否已存在该文件名的记录
+    existing_record = product_import_manager.get_product_import_by_filename(file_name)
+    
+    if existing_record:
+        logger.info(f"数据库中已存在文件 {file_name} 的记录,直接返回")
+        return existing_record
+    
+    logger.info(f"数据库中不存在文件 {file_name} 的记录,开始解析Excel并保存到数据库")
+    
+    try:
+        # 解析Excel文件为Markdown格式
+        markdown_content = parse_url_to_markdown_task(file_url)
+        
+        if not markdown_content:
+            logger.warning(f"Excel文件解析失败或为空: {file_url}")
+            raise Exception(f"Excel文件解析失败或为空: {file_url}")
+        
+        # 使用LLM从Markdown内容中提取产品信息
+        product_import = extract_product_from_text(
+            text=markdown_content,
+            uri=file_url,
+            filename=file_name
+        )
+        
+        # 保存到数据库
+        saved_record = product_import_manager.save_product_import(product_import)
+        
+        logger.info(f"成功解析Excel并保存到数据库: {file_name}")
+        return saved_record
+        
+    except Exception as e:
+        logger.error(f"处理文件 {file_name} 时发生错误: {e}")
+        raise Exception(f"处理文件失败: {e}")
+
+
+class ProductImportInput(BaseModel):
+    """产品导入输入模型"""
+    file_url: Union[str, List[str]] = Field(description="文件的URL或本地路径,可以是字符串或列表")
+
+
+@flow(
+    name="产品导入流程",
+    persist_result=True,
+    result_serializer="json",
+)
+def product_import_flow(flow_input: ProductImportInput):
+    """产品导入Prefect流程,支持字符串或列表输入,并发执行解析"""
+    # 处理输入,统一转换为URL列表
+    if isinstance(flow_input.file_url, str):
+        logger.info(f"输入为字符串,尝试提取URL: {flow_input.file_url}")
+        # 如果是字符串,尝试提取URL
+        urls = extract_urls_from_text(flow_input.file_url)
+        if not urls:
+            # 如果没有提取到URL,假设整个字符串就是一个URL
+            urls = [flow_input.file_url]
+        logger.info(f"提取到 {len(urls)} 个URL: {urls}")
+    else:
+        # 如果是列表,直接使用
+        urls = flow_input.file_url
+        logger.info(f"输入为列表,共 {len(urls)} 个URL: {urls}")
+    
+    # 并发执行所有URL的解析
+    all_futures = []
+    for url in urls:
+        future = get_or_create_product_import_by_url.with_options(
+            task_run_name=f"处理URL: {url}",
+        ).submit(url)
+        all_futures.append(future)
+    
+    # 等待所有任务完成
+    logger.info(f"等待 {len(all_futures)} 个任务完成...")
+    results = [future.result() for future in wait(all_futures).done]
+    
+    logger.info(f"所有任务完成,成功处理 {len(results)} 个文件")
+    
+    return {
+        'status': 'success',
+        'product_imports': results,
+        'file_urls': urls,
+        'total_count': len(results)
+    }
+
+
+@task
+def product_import_task(flow_input: ProductImportInput):
+    """产品导入任务"""
+    return product_import_flow(flow_input)
+

+ 1 - 1
src/readme.md

@@ -6,7 +6,7 @@ dc up -d
 # mahui4228@gmail.com
 $env:CONFIG_PATH="G:\code\amazone\copywriting_production\config\dp_conf\9321_worker.yaml";celery -A src.tasks.crawl_asin_save_task worker --loglevel=info --hostname=9321@%h
 
-# mahui6188@gmail.com U*
+# 已失效 mahui6188@gmail.com U*
 $env:CONFIG_PATH="G:\code\amazone\copywriting_production\config\dp_conf\9322_worker.yaml";celery -A src.tasks.crawl_asin_save_task worker --loglevel=info --hostname=9322@%h -Q queue_9322
 
 youka570023@gmail.com U** gg: 7m

+ 97 - 0
tests/flow_run/t_flow_run_crawl_asin.py

@@ -0,0 +1,97 @@
+"""
+ASIN爬取流程测试脚本
+====================
+
+该脚本用于测试AsinCrawlerBase,实例化所有浏览器配置并调用get_home_page方法。
+"""
+import sys
+import os
+import random
+from pprint import pprint
+from datetime import datetime
+from src.flow_task.crawl_asin import crawl_asin_flow, CrawlAsinInput, AsinAreaEnum
+from src.browser.browser_config import create_direct_browser_config
+from src.browser.crawl_base import AsinCrawlerBase
+
+def t_init_browser():
+    """主函数"""
+    # 获取所有浏览器配置
+    browser_configs = create_direct_browser_config()
+    
+    print(f"获取到 {len(browser_configs)} 个浏览器配置")
+    
+    # 遍历所有浏览器配置,为每个配置创建AsinCrawlerBase实例并调用get_home_page
+    for port, browser_config in browser_configs.items():
+        print(f"正在使用浏览器配置: 端口 {port}, 账号 {browser_config.account}")
+        
+        try:
+            # 使用AsinCrawlerBase的create_browser类方法创建实例
+            crawler = AsinCrawlerBase.create_browser(
+                address=browser_config.browser_address,
+                user_data_dir=str(browser_config.browser_userdata_dir)
+            )
+            
+            # 调用get_home_page方法
+            print(f"正在为账号 {browser_config.account} 获取首页...")
+            crawler.get_home_page()
+            print(f"成功为账号 {browser_config.account} 获取首页")
+            
+        except Exception as e:
+            print(f"为账号 {browser_config.account} 获取首页时出错: {e}")
+            
+        print("-" * 50)
+
+
+competitor_list = ["B09MQMTBJW","B000THQ4ZO","B0D6RVGL2M","B004OCLMTI","B0D7TKHSP4","B000THROUS","B08HK93VBD","B0C6LXPSVX","B0C8MRSD6P","B08LD1MZX4","B0CLCJXXWF"]
+
+def t_random_crawl_asin():
+    """随机选择一个ASIN和浏览器配置,调用crawl_asin_flow"""
+    # 获取所有浏览器配置
+    browser_configs = create_direct_browser_config()
+    
+    if not browser_configs:
+        print("没有可用的浏览器配置")
+        return
+    
+    if not competitor_list:
+        print("没有可用的ASIN列表")
+        return
+    
+    # 随机选择一个ASIN和浏览器配置
+    random_asin = competitor_list[0]
+    random_port = random.choice(list(browser_configs.keys()))
+    random_browser_config = browser_configs[9323]
+    
+    print(f"随机选择的ASIN: {random_asin}")
+    print(f"随机选择的浏览器配置: 端口 {random_port}, 账号 {random_browser_config.account}")
+    
+    try:
+        # 创建CrawlAsinInput对象
+        flow_input = CrawlAsinInput(
+            asin=random_asin,
+            asin_area=AsinAreaEnum.JP,  # 默认使用日本地区
+            mthml_type=True,  # 保存为MHTML格式
+            overwrite=False,  # 不覆盖已存在文件
+            browser=random_browser_config
+        )
+        
+        print(f"开始执行ASIN爬取流程...")
+        
+        # 调用crawl_asin_flow
+        result = crawl_asin_flow(flow_input)
+        
+        print(f"爬取流程执行成功")
+        pprint(result)
+        
+    except Exception as e:
+        print(f"执行ASIN爬取流程时出错: {e}")
+        
+    print("-" * 50)
+
+def main():
+    # t_init_browser()
+    t_random_crawl_asin()
+
+if __name__ == "__main__":
+    main()
+

+ 0 - 0
tests/mytest/t_flow_run_extra_product.py → tests/flow_run/t_flow_run_extra_product.py


+ 5 - 5
uv.lock

@@ -1,5 +1,5 @@
 version = 1
-revision = 3
+revision = 2
 requires-python = ">=3.13"
 resolution-markers = [
     "sys_platform == 'darwin'",
@@ -529,7 +529,7 @@ requires-dist = [
     { name = "celery", specifier = ">=5.4.0" },
     { name = "crawl4ai", specifier = ">=0.6.3" },
     { name = "docling", specifier = ">=2.43.0" },
-    { name = "drissionpage", specifier = ">=4.1.0.17" },
+    { name = "drissionpage", specifier = ">=4.1.1.2" },
     { name = "flower", specifier = ">=2.0.1" },
     { name = "httpx", specifier = ">=0.28.1" },
     { name = "llama-index", specifier = ">=0.12.42" },
@@ -877,7 +877,7 @@ wheels = [
 
 [[package]]
 name = "drissionpage"
-version = "4.1.0.18"
+version = "4.1.1.2"
 source = { registry = "https://pypi.org/simple" }
 dependencies = [
     { name = "click" },
@@ -889,9 +889,9 @@ dependencies = [
     { name = "tldextract" },
     { name = "websocket-client" },
 ]
-sdist = { url = "https://files.pythonhosted.org/packages/3f/54/9e99d96c7a5909a7f2ecc7bdc2978702ca5df20b45ea7f331b306c7e9b57/drissionpage-4.1.0.18.tar.gz", hash = "sha256:ea3193c628bed6f6f11b401d1e07161355ed3060d9c9ce12163df381bd09bf32", size = 206670, upload-time = "2025-03-24T15:39:23.039Z" }
+sdist = { url = "https://files.pythonhosted.org/packages/12/b3/26bb358f3c7019bba0a2e27a636382a4226780628c30ab315537524809fb/drissionpage-4.1.1.2.tar.gz", hash = "sha256:e4375d3536519a1d24430270e2871de3015836b301cbdfb96583234e6d6ef4c1", size = 208034, upload-time = "2025-07-31T07:00:36.24Z" }
 wheels = [
-    { url = "https://files.pythonhosted.org/packages/e6/83/7ed71f4e0c8d60c9aefb24f0bb95f8a8b14e090718cf3fff8a0083d33164/DrissionPage-4.1.0.18-py3-none-any.whl", hash = "sha256:5492189161e6bde036737aab0874dec7723c38153cb22815f24dba88a2fdfa57", size = 256899, upload-time = "2025-03-24T15:39:21.261Z" },
+    { url = "https://files.pythonhosted.org/packages/78/31/05834b38c12a222012f998d5c2f104f228cf8aaa37747f989c9b29e81d5d/DrissionPage-4.1.1.2-py3-none-any.whl", hash = "sha256:c8b58a8d495550142c10499ce7128655b2baf8f38f46604ba0ce951a148d88e8", size = 257497, upload-time = "2025-07-31T07:00:34.531Z" },
 ]
 
 [[package]]