| 12345678910111213141516171819202122232425262728293031323334353637383940 |
- from datetime import datetime
- from typing import List
- import asyncio
- from prefect import flow, task
- from prefect.states import Completed, Failed
- from src.flow_task.crawl_asin_flow import get_or_create_product_import_by_url, product_import_flow, ProductImportInput
- from utils.url_utils import extract_filename_from_url, extract_urls_from_text
- from src.flow_task.db.product_import_db import product_import_manager
- from utils.logu import get_logger
- logger = get_logger('flow_run_test')
- # 测试URL列表
- test_urls = [
- "http://s3.vs1.lan/public/amazone/copywriting_production/product/202508/1P镊子压刀.xlsx",
- "http://s3.vs1.lan/public/amazone/copywriting_production/product/202508/3P一体不锈钢迷你园艺铲.xlsx",
- "http://s3.vs1.lan/public/amazone/copywriting_production/product/202508/磁吸固定夹.xlsx",
- "http://s3.vs1.lan/public/amazone/copywriting_production/product/202508/锯齿固定夹.xlsx",
- "http://s3.vs1.lan/public/amazone/copywriting_production/product/202508/魔术贴金属扣.xlsx",
- "http://s3.vs1.lan/public/amazone/copywriting_production/product/202508/黑白轧带.xlsx"
- ]
- # 直接运行 product_import_flow 处理每个URL
- async def main():
- successful_results = []
- failed_results = []
-
- flow_input = ProductImportInput(file_url=test_urls)
- result = product_import_flow(flow_input)
-
- print(f"\n处理结果:")
- logger.info(f"result {result} 处理成功")
- print(f"总计: {len(test_urls)} 个URL")
- if __name__ == "__main__":
-
- # 运行主函数
- asyncio.run(main())
-
|