소스 검색

prefect:批量并发导入 Excel 产品表

mrh 4 달 전
부모
커밋
187afe65ed

+ 178 - 9
src/flow_task/crawl_asin_flow.py

@@ -1,17 +1,28 @@
 from datetime import datetime, timedelta
 from enum import StrEnum
 from pathlib import Path
-from typing import Optional
+from typing import Optional, Any, Union, List
 from pydantic import BaseModel, Field
+import re
 from prefect import flow, task
 from prefect.states import Failed, Running, Completed
 from prefect.cache_policies import INPUTS
+from prefect.futures import wait
 from src.browser.crawl_asin import Crawler
 from utils.drission_page import ChromeOptions
-from config.settings import CFG, read_config, get_config_path, TEMP_PAGE_DIR
+from config.settings import CFG, read_config, get_config_path, TEMP_PAGE_DIR, OPENAI_API_KEY, OPENAI_API_BASE
 from utils.logu import get_logger
-from utils.file import save_to_file, check_exists
+from utils.file import save_to_file, check_exists, extract_excel_text_from_url
 from utils.file import s3
+from utils.url_utils import extract_urls_from_text, extract_filename_from_url
+from llama_index.llms.litellm import LiteLLM
+from llama_index.core.program import LLMTextCompletionProgram
+from llama_index.core.output_parsers import PydanticOutputParser
+from llama_index.core.output_parsers.pydantic import extract_json_str
+from src.flow_task.db.product_import_db import product_import_manager
+from src.flow_task.db.models.product_models import ProductImport, ProductForExtraction
+from src.manager.core.db import DbManager, AsinSeed
+from markitdown import MarkItDown
 import tempfile
 import os
 
@@ -169,8 +180,6 @@ def task_save_page(crawler: Crawler, asin: str, asin_area: AsinAreaEnum,
 )
 def task_save_to_db(local_file_path: str, asin: str, mthml_type: bool, asin_area: str = 'JP'):
     """将temp目录文件上传到S3的task方法,先检查数据库是否存在记录"""
-    from src.manager.core.db import DbManager, AsinSeed
-    
     logger.info(f"开始处理文件: {local_file_path}")
     
     # 初始化数据库管理器
@@ -281,9 +290,6 @@ def parse_url_to_markdown_task(url: str):
         if url.lower().endswith(('.xlsx', '.xls')):
             logger.info(f"检测到Excel文件,使用pandas方法读取: {url}")
             
-            # 导入Excel处理函数
-            from utils.file import extract_excel_text_from_url
-            
             # 使用pandas方法读取Excel文件
             all_cells_text_dict = extract_excel_text_from_url(url)
             
@@ -305,7 +311,6 @@ def parse_url_to_markdown_task(url: str):
         else:
             # 非Excel文件使用原来的markitdown方法
             logger.info(f"检测到非Excel文件,使用markitdown方法读取: {url}")
-            from markitdown import MarkItDown
             
             # 创建MarkItDown实例
             md = MarkItDown(enable_plugins=False)
@@ -323,3 +328,167 @@ def parse_url_to_markdown_task(url: str):
         logger.error(f"解析URL表格文件时发生错误: {e}")
         raise Exception(f"解析URL表格文件失败: {e}")
 
+
+class DebugPydanticOutputParser(PydanticOutputParser):
+    """继承自PydanticOutputParser的调试版本,打印LLM生成结果"""
+    
+    def parse(self, text: str) -> Any:
+        """Parse, validate, and correct errors programmatically."""
+        logger.info("=== LLM生成结果 ===")
+        logger.info(text)
+        logger.info("=== LLM生成结果结束 ===")
+        
+        # 清理markdown代码块格式
+        cleaned_text = text
+        if "```json" in text:
+            # 移除markdown代码块标记
+            cleaned_text = text.split("```json")[1].split("```")[0]
+        elif "```" in text:
+            # 移除通用markdown代码块标记
+            cleaned_text = text.split("```")[1].split("```")[0]
+        
+        # 清理转义字符
+        cleaned_text = cleaned_text.replace("\\n", "\n").replace("\\\"", "\"")
+        
+        json_str = extract_json_str(cleaned_text)
+        return self._output_cls.model_validate_json(json_str)
+
+
+def extract_product_from_text(text: str, uri: str = "", filename: str = "") -> ProductImport:
+    """使用LLMTextCompletionProgram从文本中提取产品信息"""
+    llm = LiteLLM(model='openai/GLM-4-Flash', api_key=OPENAI_API_KEY, api_base=OPENAI_API_BASE)
+    
+    # 使用自定义的DebugPydanticOutputParser
+    output_parser = DebugPydanticOutputParser(output_cls=ProductForExtraction)
+    
+    program = LLMTextCompletionProgram.from_defaults(
+        prompt_template_str=f"请从以下文本中提取产品信息:\n\nurl: {uri} \n\n{{text}}",
+        llm=llm,
+        verbose=True,
+        output_parser=output_parser
+    )
+    
+    extracted_product = program(text=text)
+    
+    # 使用类方法创建Product实例
+    return ProductImport.from_product_extraction(
+        extracted_product=extracted_product,
+        markdown_content=text,
+        uri=uri,
+        filename=filename
+    )
+
+
+
+
+
+@task(name="Excel处理",
+    persist_result=True,
+    cache_expiration=timedelta(days=31),
+    cache_policy=INPUTS
+)
+def get_or_create_product_import_by_url(file_url: str):
+    """根据文件URL获取数据库中的ProductImport记录,如果不存在则解析Excel并保存到数据库
+    
+    Args:
+        file_url (str): 文件的URL或本地路径
+    
+    Returns:
+        ProductImport: 数据库中的ProductImport记录
+    """
+    # 从URL中提取文件名
+    file_name = extract_filename_from_url(file_url)
+    
+    logger.info(f"开始处理文件: {file_name} (URL: {file_url})")
+    
+    # 首先检查数据库中是否已存在该文件名的记录
+    existing_record = product_import_manager.get_product_import_by_filename(file_name)
+    
+    if existing_record:
+        logger.info(f"数据库中已存在文件 {file_name} 的记录,直接返回")
+        return existing_record
+    
+    logger.info(f"数据库中不存在文件 {file_name} 的记录,开始解析Excel并保存到数据库")
+    
+    try:
+        # 解析Excel文件为Markdown格式
+        markdown_content = parse_url_to_markdown_task(file_url)
+        
+        if not markdown_content:
+            logger.warning(f"Excel文件解析失败或为空: {file_url}")
+            raise Exception(f"Excel文件解析失败或为空: {file_url}")
+        
+        # 使用LLM从Markdown内容中提取产品信息
+        product_import = extract_product_from_text(
+            text=markdown_content,
+            uri=file_url,
+            filename=file_name
+        )
+        
+        # 保存到数据库
+        saved_record = product_import_manager.save_product_import(product_import)
+        
+        logger.info(f"成功解析Excel并保存到数据库: {file_name}")
+        return saved_record
+        
+    except Exception as e:
+        logger.error(f"处理文件 {file_name} 时发生错误: {e}")
+        raise Exception(f"处理文件失败: {e}")
+
+
+class ProductImportInput(BaseModel):
+    """产品导入输入模型"""
+    file_url: Union[str, List[str]] = Field(description="文件的URL或本地路径,可以是字符串或列表")
+
+
+
+
+
+@flow(
+    name="产品导入流程",
+    persist_result=True,
+    result_serializer="json",
+)
+def product_import_flow(flow_input: ProductImportInput):
+    """产品导入Prefect流程,支持字符串或列表输入,并发执行解析"""
+    # 处理输入,统一转换为URL列表
+    if isinstance(flow_input.file_url, str):
+        logger.info(f"输入为字符串,尝试提取URL: {flow_input.file_url}")
+        # 如果是字符串,尝试提取URL
+        urls = extract_urls_from_text(flow_input.file_url)
+        if not urls:
+            # 如果没有提取到URL,假设整个字符串就是一个URL
+            urls = [flow_input.file_url]
+        logger.info(f"提取到 {len(urls)} 个URL: {urls}")
+    else:
+        # 如果是列表,直接使用
+        urls = flow_input.file_url
+        logger.info(f"输入为列表,共 {len(urls)} 个URL: {urls}")
+    
+    # 并发执行所有URL的解析
+    all_futures = []
+    for url in urls:
+        future = get_or_create_product_import_by_url.with_options(
+            task_run_name=f"处理URL: {url}",
+        ).submit(url)
+        all_futures.append(future)
+    
+    # 等待所有任务完成
+    logger.info(f"等待 {len(all_futures)} 个任务完成...")
+    results = [future.result() for future in wait(all_futures).done]
+    
+    logger.info(f"所有任务完成,成功处理 {len(results)} 个文件")
+    
+    return {
+        'status': 'success',
+        'product_imports': results,
+        'file_urls': urls,
+        'total_count': len(results)
+    }
+
+
+@task
+def product_import_task(flow_input: ProductImportInput):
+    """产品导入任务"""
+    return product_import_flow(flow_input)
+

+ 1 - 0
src/flow_task/db/models/__init__.py

@@ -0,0 +1 @@
+# Models package for flow_task db

+ 61 - 0
src/flow_task/db/models/product_models.py

@@ -0,0 +1,61 @@
+from typing import List, Optional
+from pydantic import BaseModel, Field
+from sqlmodel import SQLModel, Field as SQLField
+from datetime import datetime
+
+
+class ProductBase(BaseModel):
+    """产品基础信息模型,用于共享字段定义"""
+    product_name: str = Field(..., description="产品名称")
+    material: Optional[str] = Field(default=None, description="材质")
+    color: Optional[str] = Field(default=None, description="颜色")
+    main_usage: Optional[str] = Field(default=None, description="主要用途")
+    main_selling_points: List[str] = Field(default_factory=list, description="主要卖点列表")
+    competitor_list: List[str] = Field(default_factory=list, description="竞品ASIN列表")
+
+
+class ProductForExtraction(ProductBase):
+    """用于LLMTextCompletionProgram的产品信息模型"""
+    pass
+
+
+class ProductImport(SQLModel, table=True):
+    """产品信息SQLModel模型类"""
+    id: Optional[int] = SQLField(default=None, primary_key=True)
+    product_name: str = SQLField(..., description="产品名称")
+    product_data: str = SQLField(..., description="ProductForExtraction的JSON格式数据")
+    markdown_content: str = SQLField(..., description="Markdown格式的完整数据源文本")
+    uri: str = SQLField(..., description="数据源的地址或路径")
+    filename: str = SQLField(..., description="文件名")
+    created_at: Optional[datetime] = SQLField(default_factory=datetime.now)
+    updated_at: Optional[datetime] = SQLField(default_factory=datetime.now)
+    
+    @classmethod
+    def from_product_extraction(cls, extracted_product: ProductForExtraction, markdown_content: str, uri: str, filename: str) -> 'ProductImport':
+        """从ProductForExtraction创建Product实例"""
+        import json
+        
+        # 将ProductForExtraction转换为JSON字符串
+        product_data_json = extracted_product.model_dump_json()
+        
+        return cls(
+            product_name=extracted_product.product_name,
+            product_data=product_data_json,
+            markdown_content=markdown_content,
+            uri=uri,
+            filename=filename
+        )
+    
+    def get_product_extraction(self) -> ProductForExtraction:
+        """获取ProductForExtraction实例"""
+        import json
+        
+        return ProductForExtraction.model_validate_json(self.product_data)
+    
+    def get_product_base(self) -> ProductBase:
+        """获取ProductBase实例"""
+        import json
+        
+        return ProductBase.model_validate_json(self.product_data)
+
+    

+ 109 - 0
src/flow_task/db/product_import_db.py

@@ -0,0 +1,109 @@
+from datetime import datetime
+from typing import Optional, List
+from sqlmodel import SQLModel, create_engine, Session, select, Field
+from config.settings import DB_URL
+from utils.sql_engine import create_db_and_tables
+
+# 导入ProductImport模型
+from src.flow_task.db.models.product_models import ProductImport
+
+
+class ProductImportManager:
+    """ProductImport数据库管理类"""
+    
+    def __init__(self, engine: str = None):
+        self.engine = engine or create_engine(DB_URL)
+        create_db_and_tables()
+    
+    def save_product_import(self, product_import: ProductImport) -> ProductImport:
+        """保存ProductImport到数据库"""
+        with Session(self.engine) as session:
+            session.add(product_import)
+            session.commit()
+            session.refresh(product_import)
+            return product_import
+    
+    def get_product_import_by_id(self, product_id: int) -> Optional[ProductImport]:
+        """根据ID获取ProductImport"""
+        with Session(self.engine) as session:
+            statement = select(ProductImport).where(ProductImport.id == product_id)
+            results = session.exec(statement)
+            return results.first()
+    
+    def get_product_import_by_name(self, product_name: str) -> Optional[ProductImport]:
+        """根据产品名称获取ProductImport"""
+        with Session(self.engine) as session:
+            statement = select(ProductImport).where(ProductImport.product_name == product_name)
+            results = session.exec(statement)
+            return results.first()
+    
+    def get_product_import_by_uri(self, uri: str) -> Optional[ProductImport]:
+        """根据URI获取ProductImport"""
+        with Session(self.engine) as session:
+            statement = select(ProductImport).where(ProductImport.uri == uri)
+            results = session.exec(statement)
+            return results.first()
+    
+    def get_product_import_by_filename(self, filename: str) -> Optional[ProductImport]:
+        """根据文件名获取单个ProductImport"""
+        with Session(self.engine) as session:
+            statement = select(ProductImport).where(ProductImport.filename == filename)
+            results = session.exec(statement)
+            return results.first()
+    
+    def get_all_product_imports(self, to_dict: bool = False) -> List[ProductImport]:
+        """获取所有ProductImport记录"""
+        with Session(self.engine) as session:
+            statement = select(ProductImport)
+            results = session.exec(statement)
+            list_model = results.all()
+            if to_dict:
+                return [model.model_dump() for model in list_model]
+            else:
+                return list_model
+    
+    def get_product_imports_by_filename(self, filename: str, to_dict: bool = False) -> List[ProductImport]:
+        """根据文件名获取ProductImport记录"""
+        with Session(self.engine) as session:
+            statement = select(ProductImport).where(ProductImport.filename == filename)
+            results = session.exec(statement)
+            list_model = results.all()
+            if to_dict:
+                return [model.model_dump() for model in list_model]
+            else:
+                return list_model
+    
+    def add_or_ignore_product_import(self, product_import: ProductImport) -> ProductImport:
+        """添加或忽略已存在的ProductImport(根据URI)"""
+        exist = self.get_product_import_by_uri(product_import.uri)
+        if exist:
+            return exist
+        else:
+            return self.save_product_import(product_import)
+    
+    def update_product_import(self, product_id: int, **kwargs) -> Optional[ProductImport]:
+        """更新ProductImport记录"""
+        with Session(self.engine) as session:
+            product_import = session.get(ProductImport, product_id)
+            if product_import:
+                for key, value in kwargs.items():
+                    if hasattr(product_import, key):
+                        setattr(product_import, key, value)
+                product_import.updated_at = datetime.now()
+                session.commit()
+                session.refresh(product_import)
+            return product_import
+    
+    def delete_product_import(self, product_id: int) -> bool:
+        """删除ProductImport记录"""
+        with Session(self.engine) as session:
+            product_import = session.get(ProductImport, product_id)
+            if product_import:
+                session.delete(product_import)
+                session.commit()
+                return True
+            return False
+
+
+# 创建全局实例
+product_import_manager = ProductImportManager()

+ 82 - 60
tests/mytest/llamaindex_t/t_llm_to_pydantic.py

@@ -6,64 +6,12 @@ from sqlmodel import SQLModel, Field as SQLField
 from datetime import datetime
 from llama_index.core.program import LLMTextCompletionProgram
 from llama_index.core.output_parsers import PydanticOutputParser
-from src.flow_task.crawl_asin_flow import parse_url_to_markdown_task
+from src.flow_task.crawl_asin_flow import parse_url_to_markdown_task, get_or_create_product_import_by_url, extract_filename_from_url
 from llama_index.core.output_parsers.pydantic import extract_json_str
 
-
-class ProductBase(BaseModel):
-    """产品基础信息模型,用于共享字段定义"""
-    product_name: str = Field(..., description="产品名称")
-    material: Optional[str] = Field(default=None, description="材质")
-    color: Optional[str] = Field(default=None, description="颜色")
-    main_usage: Optional[str] = Field(default=None, description="主要用途")
-    main_selling_points: List[str] = Field(default_factory=list, description="主要卖点列表")
-    competitor_list: List[str] = Field(default_factory=list, description="竞品ASIN列表")
-
-
-class ProductForExtraction(ProductBase):
-    """用于LLMTextCompletionProgram的产品信息模型"""
-    pass
-
-
-class Product(SQLModel, table=True):
-    """产品信息SQLModel模型类"""
-    id: Optional[int] = SQLField(default=None, primary_key=True)
-    product_name: str = SQLField(..., description="产品名称")
-    product_data: str = SQLField(..., description="ProductForExtraction的JSON格式数据")
-    markdown_content: str = SQLField(..., description="Markdown格式的完整数据源文本")
-    uri: str = SQLField(..., description="数据源的地址或路径")
-    filename: str = SQLField(..., description="文件名")
-    created_at: Optional[datetime] = SQLField(default_factory=datetime.now)
-    updated_at: Optional[datetime] = SQLField(default_factory=datetime.now)
-    
-    @classmethod
-    def from_product_extraction(cls, extracted_product: ProductForExtraction, markdown_content: str, uri: str, filename: str) -> 'Product':
-        """从ProductForExtraction创建Product实例"""
-        import json
-        
-        # 将ProductForExtraction转换为JSON字符串
-        product_data_json = extracted_product.model_dump_json()
-        
-        return cls(
-            product_name=extracted_product.product_name,
-            product_data=product_data_json,
-            markdown_content=markdown_content,
-            uri=uri,
-            filename=filename
-        )
-    
-    def get_product_extraction(self) -> ProductForExtraction:
-        """获取ProductForExtraction实例"""
-        import json
-        
-        return ProductForExtraction.model_validate_json(self.product_data)
-    
-    def get_product_base(self) -> ProductBase:
-        """获取ProductBase实例"""
-        import json
-        
-        return ProductBase.model_validate_json(self.product_data)
-
+# 导入数据库管理器和模型
+from src.flow_task.db.product_import_db import ProductImportManager
+from src.flow_task.db.models.product_models import ProductImport, ProductBase, ProductForExtraction
 
 
 class DebugPydanticOutputParser(PydanticOutputParser):
@@ -91,7 +39,7 @@ class DebugPydanticOutputParser(PydanticOutputParser):
         return self._output_cls.model_validate_json(json_str)
 
 
-def extract_product_from_text(text: str, uri: str = "", filename: str = "") -> Product:
+def extract_product_from_text(text: str, uri: str = "", filename: str = "") -> ProductImport:
     """使用LLMTextCompletionProgram从文本中提取产品信息"""
     llm = LiteLLM(model='openai/glm-4.5', api_key=OPENAI_API_KEY, api_base=OPENAI_API_BASE)
     llm = LiteLLM(model='openai/GLM-4.5-Air', api_key=OPENAI_API_KEY, api_base=OPENAI_API_BASE)
@@ -110,7 +58,7 @@ def extract_product_from_text(text: str, uri: str = "", filename: str = "") -> P
     extracted_product = program(text=text)
     
     # 使用新的类方法创建Product实例
-    return Product.from_product_extraction(
+    return ProductImport.from_product_extraction(
         extracted_product=extracted_product,
         markdown_content=text,
         uri=uri,
@@ -118,7 +66,7 @@ def extract_product_from_text(text: str, uri: str = "", filename: str = "") -> P
     )
 
 
-def extract_product_from_url(url: str) -> Product:
+def extract_product_from_url(url: str) -> ProductImport:
     """从URL解析表格文件并提取产品信息"""
     markdown_content = parse_url_to_markdown_task(url)
     print(markdown_content)
@@ -129,6 +77,49 @@ def extract_product_from_url(url: str) -> Product:
     )
 
 
+def test_extract_filename_from_url():
+    """测试从URL提取文件名的函数"""
+    test_cases = [
+        ("http://s3.vs1.lan/public/amazone/copywriting_production/product/202508/1P镊子压刀.xlsx", "1P镊子压刀.xlsx"),
+        ("http://example.com/path/to/file.csv?param=value", "file.csv"),
+        ("/local/path/to/document.pdf", "document.pdf"),
+        ("simple_file.txt", "simple_file.txt")
+    ]
+    
+    for url, expected in test_cases:
+        result = extract_filename_from_url(url)
+        print(f"URL: {url}")
+        print(f"Expected: {expected}")
+        print(f"Result: {result}")
+        print(f"Test {'PASSED' if result == expected else 'FAILED'}")
+        print("---")
+
+
+def test_get_or_create_product_import_by_url():
+    """测试新的get_or_create_product_import_by_url函数"""
+    test_url = "http://s3.vs1.lan/public/amazone/copywriting_production/product/202508/1P镊子压刀.xlsx"
+    
+    print("=== 测试get_or_create_product_import_by_url函数 ===")
+    print(f"测试URL: {test_url}")
+    
+    try:
+        # 测试从URL提取文件名
+        filename = extract_filename_from_url(test_url)
+        print(f"提取的文件名: {filename}")
+        
+        # 测试获取或创建产品导入记录
+        product_import = get_or_create_product_import_by_url(test_url)
+        print(f"产品名称: {product_import.product_name}")
+        print(f"文件名: {product_import.filename}")
+        print(f"URI: {product_import.uri}")
+        print("测试成功!")
+        
+    except Exception as e:
+        print(f"测试失败: {e}")
+        import traceback
+        traceback.print_exc()
+
+
 if __name__ == "__main__":
     print("=== 测试从URL提取产品信息 ===")
     test_url = "http://s3.vs1.lan/public/amazone/copywriting_production/product/202508/1P镊子压刀.xlsx"
@@ -149,4 +140,35 @@ if __name__ == "__main__":
     
     # 显示存储的JSON数据
     print(f"\n存储的JSON数据: {product_from_url.product_data}")
-
+    
+    # === 新增:保存到数据库测试 ===
+    print("\n=== 测试保存到数据库 ===")
+    try:
+        # 创建数据库管理器实例
+        db_manager = ProductImportManager()
+        
+        # 保存到数据库
+        saved_product = db_manager.save_product_import(product_from_url)
+        print(f"成功保存到数据库!ID: {saved_product.id}")
+        
+        # 验证保存是否成功
+        retrieved_product = db_manager.get_product_import_by_id(saved_product.id)
+        if retrieved_product:
+            print("数据库保存验证成功!")
+            print(f"从数据库获取的产品名称: {retrieved_product.product_name}")
+            print(f"从数据库获取的URI: {retrieved_product.uri}")
+            
+            # 验证JSON数据是否正确保存
+            retrieved_extraction = retrieved_product.get_product_extraction()
+            print(f"从数据库获取的产品材质: {retrieved_extraction.material}")
+            print(f"从数据库获取的产品颜色: {retrieved_extraction.color}")
+            print(f"从数据库获取的主要用途: {retrieved_extraction.main_usage}")
+            print(f"从数据库获取的主要卖点: {retrieved_extraction.main_selling_points}")
+            print(f"从数据库获取的竞品列表: {retrieved_extraction.competitor_list}")
+        else:
+            print("数据库保存验证失败!")
+            
+    except Exception as e:
+        print(f"保存到数据库时发生错误: {e}")
+        import traceback
+        traceback.print_exc()

+ 40 - 0
tests/mytest/t_flow_run_extra_product.py

@@ -0,0 +1,40 @@
+from datetime import datetime
+from typing import List
+import asyncio
+from prefect import flow, task
+from prefect.states import Completed, Failed
+from src.flow_task.crawl_asin_flow import get_or_create_product_import_by_url, product_import_flow, ProductImportInput
+from utils.url_utils import extract_filename_from_url, extract_urls_from_text
+from src.flow_task.db.product_import_db import product_import_manager
+from utils.logu import get_logger
+
+logger = get_logger('flow_run_test')
+
+# 测试URL列表
+test_urls = [
+    "http://s3.vs1.lan/public/amazone/copywriting_production/product/202508/1P镊子压刀.xlsx",
+    "http://s3.vs1.lan/public/amazone/copywriting_production/product/202508/3P一体不锈钢迷你园艺铲.xlsx",
+    "http://s3.vs1.lan/public/amazone/copywriting_production/product/202508/磁吸固定夹.xlsx",
+    "http://s3.vs1.lan/public/amazone/copywriting_production/product/202508/锯齿固定夹.xlsx",
+    "http://s3.vs1.lan/public/amazone/copywriting_production/product/202508/魔术贴金属扣.xlsx",
+    "http://s3.vs1.lan/public/amazone/copywriting_production/product/202508/黑白轧带.xlsx"
+]
+
+
+# 直接运行 product_import_flow 处理每个URL
+async def main():
+    successful_results = []
+    failed_results = []
+    
+    flow_input = ProductImportInput(file_url=test_urls)
+    result = product_import_flow(flow_input)
+    
+    print(f"\n处理结果:")
+    logger.info(f"result {result} 处理成功")
+    print(f"总计: {len(test_urls)} 个URL")
+
+if __name__ == "__main__":
+    
+    # 运行主函数
+    asyncio.run(main())
+    

+ 35 - 0
utils/url_utils.py

@@ -0,0 +1,35 @@
+from typing import List
+import re
+
+def extract_urls_from_text(text: str) -> List[str]:
+    """从文本中提取URL列表
+    
+    Args:
+        text (str): 包含URL的文本
+        
+    Returns:
+        List[str]: 提取的URL列表
+    """
+    # URL正则表达式模式
+    url_pattern = r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+[\w\-\._~:/?#[\]@!\$&\'()*+,;=]*'
+    
+    # 查找所有匹配的URL
+    urls = re.findall(url_pattern, text)
+    
+    # 去重并返回
+    return list(set(urls))
+
+def extract_filename_from_url(url: str) -> str:
+    """从URL中提取文件名
+    
+    Args:
+        url (str): 文件的URL或本地路径
+    
+    Returns:
+        str: 提取的文件名
+    """
+    # 移除URL中的查询参数
+    clean_url = url.split('?')[0]
+    # 提取文件名
+    filename = clean_url.split('/')[-1] if '/' in clean_url else clean_url
+    return filename