فهرست منبع

完成批量将 asinseed mhtml 解析成 json 保存

mrh 7 ماه پیش
والد
کامیت
8f4f624dc9
3فایلهای تغییر یافته به همراه337 افزوده شده و 46 حذف شده
  1. 11 0
      src/manager/core/db.py
  2. 274 44
      tests/flow_run/t_flow_run_asin_mhtml_parser.py
  3. 52 2
      utils/file.py

+ 11 - 0
src/manager/core/db.py

@@ -56,6 +56,17 @@ class DbManager:
             else:
                 return list_model
 
+    def get_asin_with_empty_extra_result_path(self, to_dict:bool=False) -> list[AsinSeed]:
+        """获取extra_result_path为空的ASIN记录"""
+        with Session(self.engine) as session:
+            statement = select(AsinSeed).where(AsinSeed.extra_result_path.is_(None))
+            results = session.exec(statement)
+            list_model = results.all()
+            if to_dict:
+                return [model.model_dump() for model in list_model]
+            else:
+                return list_model
+
     def delete_asin_seed_by_id(self, asin_id: int) -> bool:
         """根据id删除asin_seed记录,如果s3路径存在,连同一起删除"""
         with Session(self.engine) as session:

+ 274 - 44
tests/flow_run/t_flow_run_asin_mhtml_parser.py

@@ -125,10 +125,29 @@ async def process_single_asin(asin_record, browser_config):
     try:
         print(f"开始处理ASIN: {asin_record.asin}")
         result = await async_process_mhtml(asin_record.mhtml_path, browser_config)
+        
+        # 参考manager_task.py中的submit_extract_task_and_wait方法,添加保存到数据库的功能
+        if result.get('status') == 'success':
+            task_result_data = result.get('data')
+            # 保存提取结果到文件并上传S3
+            s3_dir = asin_record.mhtml_path.rsplit('/', 1)[0]
+            save_json_uri = f"{s3_dir}/{asin_record.asin}_extract.json"
+            from utils.file import save_to_file
+            save_to_file(task_result_data, save_json_uri)
+            result['path'] = save_json_uri
+            # 保存数据库记录
+            from src.manager.core.db import DbManager
+            db_manager = DbManager()
+            asin_model = db_manager.get_asin_seed(asin_record.asin)
+            asin_model.extra_result_path = save_json_uri
+            db_manager.save_asin_seed(asin_model)
+            print(f"{asin_record.asin}页面解析成功,已保存到数据库: {save_json_uri}")
+        
         return {
             'asin': asin_record.asin,
             'status': result.get('status'),
-            'data': result.get('data') if result.get('status') == 'success' else None
+            'data': result.get('data') if result.get('status') == 'success' else None,
+            'path': result.get('path') if result.get('status') == 'success' else None
         }
     except Exception as e:
         print(f"解析ASIN {asin_record.asin} 时出错: {e}")
@@ -138,8 +157,12 @@ async def process_single_asin(asin_record, browser_config):
             'error': str(e)
         }
 
-async def concurrent_mhtml_parsing():
-    """并发解析多个MHTML文件,使用多个浏览器"""
+async def concurrent_mhtml_parsing(batch_size=None):
+    """并发解析多个MHTML文件,使用多个浏览器,分批处理避免冲突
+    
+    Args:
+        batch_size: 每批处理的最大任务数,默认为浏览器实例数量
+    """
     print("开始并发解析多个MHTML文件...")
     print("=" * 60)
     
@@ -156,7 +179,7 @@ async def concurrent_mhtml_parsing():
         db_manager = DbManager()
         
         # 获取所有ASIN记录,不限制extra_result_path条件
-        asin_records = db_manager.get_all_asin(to_dict=False)
+        asin_records = db_manager.get_asin_with_empty_extra_result_path(to_dict=False)
         
         if not asin_records:
             print("没有找到有MHTML文件的ASIN记录")
@@ -165,54 +188,83 @@ async def concurrent_mhtml_parsing():
         
         print(f"前3个ASIN记录: {asin_records[:3]}")
         print(f"找到 {len(asin_records)} 个ASIN记录")
-        y = input("是否继续?")
-        if y != 'y':
-            return
+        # y = input("是否继续?")
+        # if y != 'y':
+        #     return
+        
         # 创建浏览器配置列表用于并发处理
-        asin_records = asin_records[:1]
         browser_config_list = list(browser_configs.values())
         num_browsers = len(browser_config_list)
         
+        # 设置批次大小,如果未指定则使用浏览器实例数量
+        actual_batch_size = batch_size if batch_size is not None else num_browsers
+        
         print(f"使用 {num_browsers} 个浏览器进行并发处理")
+        print(f"每批处理最多 {actual_batch_size} 个任务")
+        
+        # 使用while循环和pop方式分批处理任务
+        all_results = []
+        batch_count = 0
+        
+        # 将列表转换为可pop的格式
+        asin_records_copy = asin_records.copy()
         
-        # 创建并发任务
-        tasks = []
-        for i, asin_record in enumerate(asin_records):
-            # 轮询选择浏览器配置
-            browser_config = browser_config_list[i % num_browsers]
-            task = process_single_asin(asin_record, browser_config)
-            tasks.append(task)
-        
-        # 并发执行所有任务
-        results = await asyncio.gather(*tasks, return_exceptions=True)
-        
-        # 处理结果
-        processed_results = []
-        for result in results:
-            if isinstance(result, Exception):
-                print(f"任务执行异常: {result}")
-                processed_results.append({
-                    'asin': 'unknown',
-                    'status': 'error',
-                    'error': str(result)
-                })
-            else:
-                processed_results.append(result)
-                print(f"ASIN {result['asin']} 解析完成,状态: {result['status']}")
+        while asin_records_copy:
+            batch_count += 1
+            print(f"\n开始第 {batch_count} 批任务处理...")
+            
+            # 获取当前批次的任务(最多actual_batch_size个)
+            current_batch = []
+            for _ in range(min(actual_batch_size, len(asin_records_copy))):
+                if asin_records_copy:
+                    current_batch.append(asin_records_copy.pop(0))
+            
+            print(f"当前批次处理 {len(current_batch)} 个ASIN")
+            
+            # 创建当前批次的并发任务
+            tasks = []
+            for i, asin_record in enumerate(current_batch):
+                # 轮询选择浏览器配置
+                browser_config = browser_config_list[i % num_browsers]
+                task = process_single_asin(asin_record, browser_config)
+                tasks.append(task)
+            
+            # 并发执行当前批次的任务
+            batch_results = await asyncio.gather(*tasks, return_exceptions=True)
+            
+            # 处理当前批次的结果
+            for result in batch_results:
+                if isinstance(result, Exception):
+                    print(f"任务执行异常: {result}")
+                    all_results.append({
+                        'asin': 'unknown',
+                        'status': 'error',
+                        'error': str(result)
+                    })
+                else:
+                    all_results.append(result)
+                    print(f"ASIN {result['asin']} 解析完成,状态: {result['status']}")
+            
+            print(f"第 {batch_count} 批任务完成,剩余 {len(asin_records_copy)} 个ASIN待处理")
+            
+            # 添加短暂延迟,避免浏览器资源竞争
+            if asin_records_copy:
+                await asyncio.sleep(1)
         
         # 打印汇总结果
         print("\n" + "=" * 60)
         print("并发解析结果汇总:")
-        success_count = sum(1 for r in processed_results if r['status'] == 'success')
-        error_count = sum(1 for r in processed_results if r['status'] == 'error')
+        success_count = sum(1 for r in all_results if r['status'] == 'success')
+        error_count = sum(1 for r in all_results if r['status'] == 'error')
         
         print(f"成功: {success_count}, 失败: {error_count}")
-        print(f"总处理数: {len(processed_results)}")
+        print(f"总处理数: {len(all_results)}")
+        print(f"总批次数: {batch_count}")
         
-        for result in processed_results:
+        for result in all_results:
             print(f"ASIN: {result['asin']}, 状态: {result['status']}")
             
-        return processed_results
+        return all_results
             
     except Exception as e:
         print(f"并发解析MHTML时出错: {e}")
@@ -222,32 +274,210 @@ async def concurrent_mhtml_parsing():
     finally:
         print("=" * 60)
 
-def run_concurrent_parsing():
-    """直接运行并发解析,不进行测试"""
+def run_concurrent_parsing(batch_size=None):
+    """直接运行并发解析,不进行测试
+    
+    Args:
+        batch_size: 每批处理的最大任务数,默认为浏览器实例数量
+    """
     print("直接并发运行ASIN MHTML解析")
     print("=" * 60)
     
     # 直接运行并发解析
-    results = asyncio.run(concurrent_mhtml_parsing())
+    results = asyncio.run(concurrent_mhtml_parsing(batch_size))
+    
+    print("并发解析完成")
+    return results
+
+def run_concurrent_parsing_with_custom_batch():
+    """使用自定义批次大小运行并发解析"""
+    print("使用自定义批次大小并发运行ASIN MHTML解析")
+    print("=" * 60)
+    
+    # 使用较小的批次大小来减少浏览器竞争
+    batch_size = 3  # 可以根据实际情况调整
+    print(f"设置批次大小为: {batch_size}")
+    
+    results = asyncio.run(concurrent_mhtml_parsing(batch_size))
     
     print("并发解析完成")
     return results
 
+async def concurrent_mhtml_parsing_with_dynamic_batch():
+    """动态调整批次大小的并发解析,根据任务执行情况自动调整"""
+    print("开始动态批次大小并发解析多个MHTML文件...")
+    print("=" * 60)
+    
+    try:
+        # 获取所有浏览器配置
+        browser_configs = create_direct_browser_config_with_stats()
+        if not browser_configs:
+            print("没有可用的浏览器配置")
+            return
+        
+        print(f"可用浏览器配置: {list(browser_configs.keys())}")
+        
+        # 获取多个已存在的MHTML文件进行测试
+        db_manager = DbManager()
+        
+        # 获取所有ASIN记录,不限制extra_result_path条件
+        asin_records = db_manager.get_asin_with_empty_extra_result_path(to_dict=False)
+        
+        if not asin_records:
+            print("没有找到有MHTML文件的ASIN记录")
+            print("请先运行ASIN爬取流程来生成MHTML文件")
+            return
+        
+        print(f"前3个ASIN记录: {asin_records[:3]}")
+        print(f"找到 {len(asin_records)} 个ASIN记录")
+        # y = input("是否继续?")
+        # if y != 'y':
+        #     return
+        
+        # 创建浏览器配置列表用于并发处理
+        browser_config_list = list(browser_configs.values())
+        num_browsers = len(browser_config_list)
+        
+        print(f"使用 {num_browsers} 个浏览器进行并发处理")
+        
+        # 动态批次处理
+        all_results = []
+        batch_count = 0
+        success_rate = 1.0  # 初始成功率
+        current_batch_size = num_browsers  # 初始批次大小
+        min_batch_size = 1
+        max_batch_size = num_browsers * 2
+        
+        # 将列表转换为可pop的格式
+        asin_records_copy = asin_records.copy()
+        
+        while asin_records_copy:
+            batch_count += 1
+            print(f"\n开始第 {batch_count} 批任务处理...")
+            print(f"当前批次大小: {current_batch_size} (基于上批成功率: {success_rate:.2%})")
+            
+            # 获取当前批次的任务
+            current_batch = []
+            for _ in range(min(current_batch_size, len(asin_records_copy))):
+                if asin_records_copy:
+                    current_batch.append(asin_records_copy.pop(0))
+            
+            print(f"当前批次处理 {len(current_batch)} 个ASIN")
+            
+            # 创建当前批次的并发任务
+            tasks = []
+            for i, asin_record in enumerate(current_batch):
+                # 轮询选择浏览器配置
+                browser_config = browser_config_list[i % num_browsers]
+                task = process_single_asin(asin_record, browser_config)
+                tasks.append(task)
+            
+            # 并发执行当前批次的任务
+            batch_results = await asyncio.gather(*tasks, return_exceptions=True)
+            
+            # 处理当前批次的结果并计算成功率
+            batch_success_count = 0
+            batch_total_count = len(batch_results)
+            
+            for result in batch_results:
+                if isinstance(result, Exception):
+                    print(f"任务执行异常: {result}")
+                    all_results.append({
+                        'asin': 'unknown',
+                        'status': 'error',
+                        'error': str(result)
+                    })
+                else:
+                    all_results.append(result)
+                    if result['status'] == 'success':
+                        batch_success_count += 1
+                    print(f"ASIN {result['asin']} 解析完成,状态: {result['status']}")
+            
+            # 计算当前批次的成功率
+            current_success_rate = batch_success_count / batch_total_count if batch_total_count > 0 else 0
+            print(f"第 {batch_count} 批成功率: {current_success_rate:.2%} ({batch_success_count}/{batch_total_count})")
+            
+            # 动态调整下一批的批次大小
+            if current_success_rate >= 0.8:  # 成功率高,可以增加批次大小
+                current_batch_size = min(current_batch_size + 1, max_batch_size)
+            elif current_success_rate < 0.5:  # 成功率低,减少批次大小
+                current_batch_size = max(current_batch_size - 1, min_batch_size)
+            
+            # 更新整体成功率
+            success_rate = current_success_rate
+            
+            print(f"第 {batch_count} 批任务完成,剩余 {len(asin_records_copy)} 个ASIN待处理")
+            print(f"下一批批次大小调整为: {current_batch_size}")
+            
+            # 添加短暂延迟,避免浏览器资源竞争
+            if asin_records_copy:
+                await asyncio.sleep(1)
+        
+        # 打印汇总结果
+        print("\n" + "=" * 60)
+        print("动态批次解析结果汇总:")
+        success_count = sum(1 for r in all_results if r['status'] == 'success')
+        error_count = sum(1 for r in all_results if r['status'] == 'error')
+        
+        print(f"成功: {success_count}, 失败: {error_count}")
+        print(f"总处理数: {len(all_results)}")
+        print(f"总批次数: {batch_count}")
+        print(f"最终成功率: {success_count/len(all_results):.2%}")
+        
+        for result in all_results:
+            print(f"ASIN: {result['asin']}, 状态: {result['status']}")
+            
+        return all_results
+            
+    except Exception as e:
+        print(f"动态批次解析MHTML时出错: {e}")
+        logger.exception(f"动态批次解析MHTML时出错: {e}")
+        return []
+    
+    finally:
+        print("=" * 60)
+
+def run_dynamic_parsing():
+    """运行动态批次大小并发解析"""
+    print("运行动态批次大小ASIN MHTML解析")
+    print("=" * 60)
+    
+    # 运行动态批次解析
+    results = asyncio.run(concurrent_mhtml_parsing_with_dynamic_batch())
+    
+    print("动态批次解析完成")
+    return results
+
 def main():
-    """主函数 - 直接并发运行"""
+    """主函数 - 提供多种并发处理模式选择"""
     print("ASIN MHTML解析流程 - 并发运行")
     print("=" * 60)
+    print("请选择处理模式:")
+    print("1. 默认批次大小(浏览器实例数量)")
+    print("2. 自定义批次大小")
+    print("3. 动态批次大小(根据成功率自动调整)")
     
-    # 直接运行并发解析
+    # choice = input("请输入选项 (1/2/3): ").strip()
+    
+    results = None
     results = run_concurrent_parsing()
     
     # 输出最终统计信息
     if results:
         success_count = sum(1 for r in results if r['status'] == 'success')
         error_count = sum(1 for r in results if r['status'] == 'error')
-        print(f"\n最终统计: 成功 {success_count} 个, 失败 {error_count} 个")
+        total_count = len(results)
+        success_rate = success_count / total_count if total_count > 0 else 0
+        
+        print(f"\n最终统计:")
+        print(f"成功: {success_count} 个")
+        print(f"失败: {error_count} 个")
+        print(f"总计: {total_count} 个")
+        print(f"成功率: {success_rate:.2%}")
+    else:
+        print("\n没有处理结果")
     
-    print("并发解析完成")
+    print("\n处理完成")
 
 if __name__ == "__main__":
     main()

+ 52 - 2
utils/file.py

@@ -206,6 +206,41 @@ def upload_file_to_s3(file_path:str,s3_uri:str, **extra_args):
         ExtraArgs=upload_args
     )
 
+def http_url_to_s3_uri(http_url):
+    """
+    将 Minio 的 HTTP 访问链接转换为 s3://bucket/key 格式的 URI。
+    是 s3_uri_to_http_url 函数的反向操作。
+    """
+    if not http_url.startswith('http://') and not http_url.startswith('https://'):
+        # 如果不是HTTP URL,直接返回原值
+        return http_url
+    
+    # 获取并清理 endpoint(确保无末尾斜杠)
+    endpoint = CFG.s3_endpoint.rstrip('/')
+    
+    # 检查URL是否使用当前配置的endpoint
+    if not http_url.startswith(endpoint):
+        # 如果不是当前配置的endpoint,无法转换,返回原值
+        return http_url
+    
+    # 移除 endpoint 部分
+    path_part = http_url[len(endpoint):].lstrip('/')
+    
+    # 分离 bucket 和 key
+    parts = path_part.split('/', 1)
+    if len(parts) < 2:
+        # 路径格式不正确,返回原值
+        return http_url
+    
+    bucket = parts[0]
+    key = parts[1]
+    
+    # 对 key 进行 URL 解码
+    decoded_key = urllib.parse.unquote(key)
+    
+    # 拼接 S3 URI
+    return f"s3://{bucket}/{decoded_key}"
+
 def save_to_file(content, filename:Path, **extra_args):
     '''
     save_to_file(
@@ -214,8 +249,23 @@ def save_to_file(content, filename:Path, **extra_args):
         Metadata={'mykey':'myvalue','mykey2':'myvalue2'}
         )
     '''
-    if str(filename).startswith('s3://'):
-        return upload_to_s3(content, str(filename), **extra_args)
+    filename_str = str(filename)
+    
+    # 如果是HTTP URL,尝试转换为S3 URI
+    if filename_str.startswith('http://') or filename_str.startswith('https://'):
+        s3_uri = http_url_to_s3_uri(filename_str)
+        if s3_uri.startswith('s3://'):
+            # 转换成功,使用S3 URI进行上传
+            return upload_to_s3(content, s3_uri, **extra_args)
+        else:
+            # 转换失败,抛出异常
+            raise ValueError(f"Cannot save to HTTP URL: {filename_str}. Please use S3 URI format.")
+    
+    # 如果是S3 URI,直接上传
+    if filename_str.startswith('s3://'):
+        return upload_to_s3(content, filename_str, **extra_args)
+    
+    # 本地文件保存
     if isinstance(content, str):
         with open(filename, "w", encoding="utf-8") as file:
             file.write(content)