from pathlib import Path from config.settings import CFG from src.manager.core.db import DbManager,AsinSeed from utils.file import save_to_file, read_file from src.tasks.crawl_asin_save_task import get_asin_and_save_page from src.tasks.crawl_asin_exract_task import extra_result from celery.result import AsyncResult from utils.logu import get_logger logger = get_logger('main') class ManagerTask: s3_prefix = CFG.s3_prefix + '/output/page' def __init__(self): self.db = DbManager() def submit_task_and_wait(self, asin: str, asin_area: str = 'JP',overwrite:bool=False, timeout: int = 300): model = self.db.get_asin_seed(asin) if model and model.mhtml_path: logger.info(f"{asin}已经爬取过,跳过") return model """提交任务并等待完成,保存结果路径到数据库""" # 提交celery任务 task = get_asin_and_save_page.delay(asin, asin_area, overwrite) # 等待任务完成 result = AsyncResult(task.id) result.get(timeout=timeout) # 处理任务结果 if result.successful(): task_result = result.result model.mhtml_path = task_result['path'] self.db.save_asin_seed(model) return None def submit_extract_task_and_wait(self, asin: str, asin_area: str = 'JP', timeout: int = 300): """提交页面解析任务并等待完成,保存结果到数据库""" # 从数据库获取mhtml路径 asin_seed = self.db.get_asin_seed(asin) if asin_seed and asin_seed.extra_result_path: logger.info(f"{asin}已经解析过,跳过") return asin_seed if not asin_seed or not asin_seed.mhtml_path: print(f"未找到{asin}的mhtml路径") return None logger.info(f"{asin}页面解析开始: {asin_seed.mhtml_path}") # 提交celery任务 task = extra_result.delay(asin_seed.mhtml_path) # 等待任务完成 result = AsyncResult(task.id) result.get(timeout=timeout) # 处理任务结果 if result.successful(): task_result = result.result if task_result['status'] == 'success': task_result_data = task_result['data'] # 保存提取结果到文件并上传S3 s3_dir = asin_seed.mhtml_path.rsplit('/', 1)[0] save_json_uri = f"{s3_dir}/{asin}_extract.json" save_to_file(task_result_data, save_json_uri) task_result['path'] = save_json_uri # 保存数据库记录 asin_model = self.db.get_asin_seed(asin=asin) asin_model.extra_result_path = save_json_uri self.db.save_asin_seed(asin_model) logger.info(f"{asin}页面解析成功: {task_result}") return task_result def save_task_asin_crawl_result(self, asin: str, asin_area:str=None, task_result: dict={}): if task_result['status'] == 'success': # 更新数据库记录 asin_seed = self.db.get_asin_seed(asin) if asin_seed: asin_seed.mhtml_path = task_result['path'] self.db.update_asin_seed(asin_seed) else: self.db.add_or_ignore_asin_seed(AsinSeed(asin=asin, asin_area=asin_area, mhtml_path=task_result['path'])) return asin_seed def upload_file(self, file_path: str, filename: str): res = save_to_file(Path(file_path).read_text(), self.s3_prefix + '/' + filename) return res def upload_mhtml(self, file_path: str, s3_filename: str=None): if not s3_filename: s3_filename = Path(file_path).stem + '.mhtml' res = self.upload_file(file_path, s3_filename) def main(): asinseed_list = ['B0CQ1SHD8V', 'B0B658JC22', 'B0DQ84H883', 'B0D44RT8R8'] manager = ManagerTask() for asin in asinseed_list: manager.submit_task_and_wait(asin) manager.submit_extract_task_and_wait(asin) # result = {'status': 'success', 'path': 's3://public/amazone/copywriting_production/output/B0B658JC22/B0B658JC22.mhtml'} # manager.save_task_asin_crawl_result('B0B658JC22', 'JP', result) if __name__ == "__main__": main()