|
|
@@ -1,5 +1,5 @@
|
|
|
from typing import List, Optional
|
|
|
-from celery import current_app
|
|
|
+from celery import current_app, group
|
|
|
from pydantic import BaseModel, Field
|
|
|
from worker.html_convert.pandoc import process_single_example, process_all_results
|
|
|
from mylib.logu import get_logger
|
|
|
@@ -9,8 +9,9 @@ from worker.search_engine.valid_google_search import ValidSearchResult
|
|
|
|
|
|
logger = get_logger('pandoc_tasks')
|
|
|
class ConvertTaskParams(BaseModel):
|
|
|
- result_ids: List[str] = Field(..., min_length=1)
|
|
|
- batch_size: Optional[int] = Field(10, gt=0)
|
|
|
+ result_ids: List[str] = Field(..., min_length=0)
|
|
|
+ batch_size: Optional[int] = Field(0, gt=-1)
|
|
|
+ queue_name: Optional[str] = Field('convert_queue', description="任务队列名称") # 新增队列参数
|
|
|
|
|
|
@current_app.task(name='html_convert_tasks.convert_single_result')
|
|
|
def convert_single_result_task(result_id: int):
|
|
|
@@ -34,36 +35,58 @@ def convert_single_result_task(result_id: int):
|
|
|
|
|
|
@current_app.task(name='html_convert_tasks.convert_all_results')
|
|
|
def convert_all_results_task(input_params: ConvertTaskParams=None):
|
|
|
- """
|
|
|
- Celery task to convert all SearchResultItems using Pandoc.
|
|
|
-
|
|
|
- Returns:
|
|
|
- dict: Task result status.
|
|
|
- """
|
|
|
+ """批量转换所有结果任务"""
|
|
|
try:
|
|
|
- logger.info("Starting Pandoc conversion for all SearchResultItems")
|
|
|
- test_task_process_all_results()
|
|
|
- logger.info("Pandoc conversion completed for all SearchResultItems")
|
|
|
- return {"status": "completed"}
|
|
|
+ config = ConvertTaskParams(**input_params) if input_params and isinstance(input_params, dict) else input_params
|
|
|
+ valid_search = ValidSearchResult()
|
|
|
+ valid_items = valid_search.get_valid_search_result_items()
|
|
|
+ result_ids = [str(item.id) for item in valid_items]
|
|
|
+ logger.info(f"开始批量转换 {len(result_ids)} 个结果,队列: {config.queue_name}")
|
|
|
+
|
|
|
+ # 创建任务签名并分组
|
|
|
+ task_signatures = [
|
|
|
+ convert_single_result_task.s(result_id).set(queue=config.queue_name)
|
|
|
+ for result_id in result_ids
|
|
|
+ ]
|
|
|
+
|
|
|
+ # 分批次提交(每50个为一组)
|
|
|
+ chunk_size = 50
|
|
|
+ task_ids = []
|
|
|
+ for i in range(0, len(task_signatures), chunk_size):
|
|
|
+ chunk = task_signatures[i:i+chunk_size]
|
|
|
+ group(chunk).apply_async()
|
|
|
+ task_ids.extend([t.id for t in chunk])
|
|
|
+ logger.info(f"已提交第 {i//chunk_size + 1} 批转换任务,共 {len(chunk)} 个")
|
|
|
+
|
|
|
+ return {
|
|
|
+ "status": "success",
|
|
|
+ "task_ids": task_ids,
|
|
|
+ "message": f"已提交 {len(result_ids)} 个转换任务(分{len(task_ids)//chunk_size +1}批)"
|
|
|
+ }
|
|
|
except Exception as e:
|
|
|
logger.exception(f"Error during bulk Pandoc conversion: {str(e)}")
|
|
|
return {"status": "failed", "error": str(e)}
|
|
|
-
|
|
|
def test_task_process_all_results():
|
|
|
- # Process all valid results using ValidSearchResult
|
|
|
- valid_search = ValidSearchResult()
|
|
|
- valid_items = valid_search.get_valid_search_result_items()
|
|
|
-
|
|
|
- logger.info(f"Total valid results: {len(valid_items)}")
|
|
|
- logger.info(f"First 5 valid result IDs: {[item.id for item in valid_items[:5]]}")
|
|
|
-
|
|
|
- for item in valid_items:
|
|
|
- try:
|
|
|
- if item.html_path and item.html_path.endswith('.html'):
|
|
|
- logger.info(f"Submitting task for valid SearchResultItem ID: {item.id}")
|
|
|
- convert_single_result_task.delay(item.id)
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"Error processing valid result {item.id}: {e}")
|
|
|
+ """批量提交转换任务(测试用)"""
|
|
|
+ try:
|
|
|
+ valid_search = ValidSearchResult()
|
|
|
+ valid_items = valid_search.get_valid_search_result_items()
|
|
|
+
|
|
|
+ logger.info(f"找到 {len(valid_items)} 个有效结果,开始批量提交...")
|
|
|
+
|
|
|
+ # 创建任务参数
|
|
|
+ params = ConvertTaskParams(
|
|
|
+ result_ids=[str(item.id) for item in valid_items],
|
|
|
+ queue_name='convert_queue'
|
|
|
+ )
|
|
|
+
|
|
|
+ # 调用转换任务
|
|
|
+ result = convert_all_results_task(params)
|
|
|
+ logger.info(f"批量提交完成,任务ID: {result.get('task_ids', [])}")
|
|
|
+ return result
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"批量提交转换任务失败: {str(e)}")
|
|
|
+ raise f"批量提交转换任务失败 {str(e)}"
|
|
|
|
|
|
def clear_existing_tasks():
|
|
|
"""清除所有待处理的任务"""
|