| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- from pathlib import Path
- from config.settings import CFG
- from src.manager.core.db import DbManager,AsinSeed
- from utils.file import save_to_file, read_file
- from src.tasks.crawl_asin_save_task import get_asin_and_save_page
- from src.tasks.crawl_asin_exract_task import extra_result
- from celery.result import AsyncResult
- from utils.logu import get_logger
- logger = get_logger('main')
- class ManagerTask:
- s3_prefix = CFG.s3_prefix + '/output/page'
- def __init__(self):
- self.db = DbManager()
- def submit_task_and_wait(self, asin: str, asin_area: str = 'JP',overwrite:bool=False, timeout: int = 300):
- model = self.db.get_asin_seed(asin)
- if model and model.mhtml_path:
- logger.info(f"{asin}已经爬取过,跳过")
- return model
- """提交任务并等待完成,保存结果路径到数据库"""
- # 提交celery任务
- task = get_asin_and_save_page.delay(asin, asin_area, overwrite)
-
- # 等待任务完成
- result = AsyncResult(task.id)
- result.get(timeout=timeout)
-
- # 处理任务结果
- if result.successful():
- task_result = result.result
- model.mhtml_path = task_result['path']
- self.db.save_asin_seed(model)
- return None
-
- def submit_extract_task_and_wait(self, asin: str, asin_area: str = 'JP', timeout: int = 300):
- """提交页面解析任务并等待完成,保存结果到数据库"""
- # 从数据库获取mhtml路径
- asin_seed = self.db.get_asin_seed(asin)
- if asin_seed and asin_seed.extra_result_path:
- logger.info(f"{asin}已经解析过,跳过")
- return asin_seed
- if not asin_seed or not asin_seed.mhtml_path:
- print(f"未找到{asin}的mhtml路径")
- return None
- logger.info(f"{asin}页面解析开始: {asin_seed.mhtml_path}")
- # 提交celery任务
- task = extra_result.delay(asin_seed.mhtml_path)
-
- # 等待任务完成
- result = AsyncResult(task.id)
- result.get(timeout=timeout)
-
- # 处理任务结果
- if result.successful():
- task_result = result.result
- if task_result['status'] == 'success':
- task_result_data = task_result['data']
- # 保存提取结果到文件并上传S3
- s3_dir = asin_seed.mhtml_path.rsplit('/', 1)[0]
- save_json_uri = f"{s3_dir}/{asin}_extract.json"
- save_to_file(task_result_data, save_json_uri)
- task_result['path'] = save_json_uri
- # 保存数据库记录
- asin_model = self.db.get_asin_seed(asin=asin)
- asin_model.extra_result_path = save_json_uri
- self.db.save_asin_seed(asin_model)
- logger.info(f"{asin}页面解析成功: {task_result}")
- return task_result
-
- def save_task_asin_crawl_result(self, asin: str, asin_area:str=None, task_result: dict={}):
- if task_result['status'] == 'success':
- # 更新数据库记录
- asin_seed = self.db.get_asin_seed(asin)
- if asin_seed:
- asin_seed.mhtml_path = task_result['path']
- self.db.update_asin_seed(asin_seed)
- else:
- self.db.add_or_ignore_asin_seed(AsinSeed(asin=asin, asin_area=asin_area, mhtml_path=task_result['path']))
- return asin_seed
- def upload_file(self, file_path: str, filename: str):
- res = save_to_file(Path(file_path).read_text(), self.s3_prefix + '/' + filename)
- return res
- def upload_mhtml(self, file_path: str, s3_filename: str=None):
- if not s3_filename:
- s3_filename = Path(file_path).stem + '.mhtml'
- res = self.upload_file(file_path, s3_filename)
- def main():
- asinseed_list = ['B0CQ1SHD8V', 'B0B658JC22', 'B0DQ84H883', 'B0D44RT8R8']
- manager = ManagerTask()
- for asin in asinseed_list:
- manager.submit_task_and_wait(asin)
- manager.submit_extract_task_and_wait(asin)
- # result = {'status': 'success', 'path': 's3://public/amazone/copywriting_production/output/B0B658JC22/B0B658JC22.mhtml'}
- # manager.save_task_asin_crawl_result('B0B658JC22', 'JP', result)
- if __name__ == "__main__":
- main()
|