manager_task.py 4.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. from pathlib import Path
  2. from config.settings import CFG
  3. from src.manager.core.db import DbManager,AsinSeed
  4. from utils.file import save_to_file, read_file
  5. from src.tasks.crawl_asin_save_task import get_asin_and_save_page
  6. from src.tasks.crawl_asin_exract_task import extra_result
  7. from celery.result import AsyncResult
  8. from utils.logu import get_logger
  9. logger = get_logger('main')
  10. class ManagerTask:
  11. s3_prefix = CFG.s3_prefix + '/output/page'
  12. def __init__(self):
  13. self.db = DbManager()
  14. def submit_task_and_wait(self, asin: str, asin_area: str = 'JP',overwrite:bool=False, timeout: int = 300):
  15. model = self.db.get_asin_seed(asin)
  16. if model and model.mhtml_path:
  17. logger.info(f"{asin}已经爬取过,跳过")
  18. return model
  19. """提交任务并等待完成,保存结果路径到数据库"""
  20. # 提交celery任务
  21. task = get_asin_and_save_page.delay(asin, asin_area, overwrite)
  22. # 等待任务完成
  23. result = AsyncResult(task.id)
  24. result.get(timeout=timeout)
  25. # 处理任务结果
  26. if result.successful():
  27. task_result = result.result
  28. model.mhtml_path = task_result['path']
  29. self.db.save_asin_seed(model)
  30. return None
  31. def submit_extract_task_and_wait(self, asin: str, asin_area: str = 'JP', timeout: int = 300):
  32. """提交页面解析任务并等待完成,保存结果到数据库"""
  33. # 从数据库获取mhtml路径
  34. asin_seed = self.db.get_asin_seed(asin)
  35. if asin_seed and asin_seed.extra_result_path:
  36. logger.info(f"{asin}已经解析过,跳过")
  37. return asin_seed
  38. if not asin_seed or not asin_seed.mhtml_path:
  39. print(f"未找到{asin}的mhtml路径")
  40. return None
  41. logger.info(f"{asin}页面解析开始: {asin_seed.mhtml_path}")
  42. # 提交celery任务
  43. task = extra_result.delay(asin_seed.mhtml_path)
  44. # 等待任务完成
  45. result = AsyncResult(task.id)
  46. result.get(timeout=timeout)
  47. # 处理任务结果
  48. if result.successful():
  49. task_result = result.result
  50. if task_result['status'] == 'success':
  51. task_result_data = task_result['data']
  52. # 保存提取结果到文件并上传S3
  53. s3_dir = asin_seed.mhtml_path.rsplit('/', 1)[0]
  54. save_json_uri = f"{s3_dir}/{asin}_extract.json"
  55. save_to_file(task_result_data, save_json_uri)
  56. task_result['path'] = save_json_uri
  57. # 保存数据库记录
  58. asin_model = self.db.get_asin_seed(asin=asin)
  59. asin_model.extra_result_path = save_json_uri
  60. self.db.save_asin_seed(asin_model)
  61. logger.info(f"{asin}页面解析成功: {task_result}")
  62. return task_result
  63. def save_task_asin_crawl_result(self, asin: str, asin_area:str=None, task_result: dict={}):
  64. if task_result['status'] == 'success':
  65. # 更新数据库记录
  66. asin_seed = self.db.get_asin_seed(asin)
  67. if asin_seed:
  68. asin_seed.mhtml_path = task_result['path']
  69. self.db.update_asin_seed(asin_seed)
  70. else:
  71. self.db.add_or_ignore_asin_seed(AsinSeed(asin=asin, asin_area=asin_area, mhtml_path=task_result['path']))
  72. return asin_seed
  73. def upload_file(self, file_path: str, filename: str):
  74. res = save_to_file(Path(file_path).read_text(), self.s3_prefix + '/' + filename)
  75. return res
  76. def upload_mhtml(self, file_path: str, s3_filename: str=None):
  77. if not s3_filename:
  78. s3_filename = Path(file_path).stem + '.mhtml'
  79. res = self.upload_file(file_path, s3_filename)
  80. def main():
  81. asinseed_list = ['B0CQ1SHD8V', 'B0B658JC22', 'B0DQ84H883', 'B0D44RT8R8']
  82. manager = ManagerTask()
  83. for asin in asinseed_list:
  84. manager.submit_task_and_wait(asin)
  85. manager.submit_extract_task_and_wait(asin)
  86. # result = {'status': 'success', 'path': 's3://public/amazone/copywriting_production/output/B0B658JC22/B0B658JC22.mhtml'}
  87. # manager.save_task_asin_crawl_result('B0B658JC22', 'JP', result)
  88. if __name__ == "__main__":
  89. main()