t_move_s3_file.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738
  1. import asyncio
  2. from pathlib import Path
  3. from config.settings import CFG
  4. from src.manager.core.db import DbManager,AsinSeed
  5. from src.manager.core.db_mongo import BaseMongoManager
  6. from utils.file import save_to_file, read_file
  7. from config.celery import app
  8. # Remove direct task imports
  9. from celery.result import AsyncResult
  10. from src.models.product_model import Product,CompetitorCrawlData
  11. from utils.logu import get_logger
  12. from upath import UPath
  13. from src.manager.manager_task import ManagerTask
  14. logger = get_logger('test')
  15. async def test_product_mongo():
  16. manager = ManagerTask()
  17. await manager.db_mongo.initialize()
  18. product = await Product.find_one(Product.basic_info.name == "电线保护套")
  19. logger.info(f"{product}")
  20. asin_completed = manager.db.get_asin_completed()
  21. for asin_model in asin_completed:
  22. key = asin_model.asin
  23. competitor = product.competitor_crawl_data.get(key)
  24. logger.info(f"{asin_model.mhtml_path}")
  25. mthml_data = read_file(asin_model.mhtml_path)
  26. mhtml_file_name = UPath(asin_model.mhtml_path).name
  27. mhtml_without_ext = mhtml_file_name.split('.')[0]
  28. new_path = f's3://public/amazone/copywriting_production/output/asinseed/{mhtml_without_ext}/{mhtml_file_name}'
  29. res = save_to_file(mthml_data, new_path)
  30. logger.info(f"new path {res}")
  31. competitor.mhtml_path = res
  32. await product.save()
  33. return product
  34. async def main():
  35. await test_product_mongo()
  36. if __name__ == "__main__":
  37. asyncio.run(main())