t_flow_run_extra_product.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940
  1. from datetime import datetime
  2. from typing import List
  3. import asyncio
  4. from prefect import flow, task
  5. from prefect.states import Completed, Failed
  6. from src.flow_task.crawl_asin_flow import get_or_create_product_import_by_url, product_import_flow, ProductImportInput
  7. from utils.url_utils import extract_filename_from_url, extract_urls_from_text
  8. from src.flow_task.db.product_import_db import product_import_manager
  9. from utils.logu import get_logger
  10. logger = get_logger('flow_run_test')
  11. # 测试URL列表
  12. test_urls = [
  13. "http://s3.vs1.lan/public/amazone/copywriting_production/product/202508/1P镊子压刀.xlsx",
  14. "http://s3.vs1.lan/public/amazone/copywriting_production/product/202508/3P一体不锈钢迷你园艺铲.xlsx",
  15. "http://s3.vs1.lan/public/amazone/copywriting_production/product/202508/磁吸固定夹.xlsx",
  16. "http://s3.vs1.lan/public/amazone/copywriting_production/product/202508/锯齿固定夹.xlsx",
  17. "http://s3.vs1.lan/public/amazone/copywriting_production/product/202508/魔术贴金属扣.xlsx",
  18. "http://s3.vs1.lan/public/amazone/copywriting_production/product/202508/黑白轧带.xlsx"
  19. ]
  20. # 直接运行 product_import_flow 处理每个URL
  21. async def main():
  22. successful_results = []
  23. failed_results = []
  24. flow_input = ProductImportInput(file_url=test_urls)
  25. result = product_import_flow(flow_input)
  26. print(f"\n处理结果:")
  27. logger.info(f"result {result} 处理成功")
  28. print(f"总计: {len(test_urls)} 个URL")
  29. if __name__ == "__main__":
  30. # 运行主函数
  31. asyncio.run(main())