html_convert_tasks.py 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. from typing import List, Optional
  2. from celery import current_app
  3. from pydantic import BaseModel, Field
  4. from worker.html_convert.pandoc import process_single_example, process_all_results
  5. from mylib.logu import get_logger
  6. from worker.search_engine.search_result_db import SearchResultItem, SearchResultManager
  7. from sqlmodel import Session, select
  8. from worker.search_engine.valid_google_search import ValidSearchResult
  9. logger = get_logger('pandoc_tasks')
  10. class ConvertTaskParams(BaseModel):
  11. result_ids: List[str] = Field(..., min_length=1)
  12. batch_size: Optional[int] = Field(10, gt=0)
  13. @current_app.task(name='html_convert_tasks.convert_single_result')
  14. def convert_single_result_task(result_id: int):
  15. """
  16. Celery task to convert a single SearchResultItem using Pandoc.
  17. Args:
  18. result_id (int): The ID of the SearchResultItem to process.
  19. Returns:
  20. dict: Task result status.
  21. """
  22. try:
  23. logger.info(f"Starting Pandoc conversion for SearchResultItem ID: {result_id}")
  24. process_single_example(result_id)
  25. logger.info(f"Pandoc conversion completed for SearchResultItem ID: {result_id}")
  26. return {"result_id": result_id, "status": "completed"}
  27. except Exception as e:
  28. logger.exception(f"Error during Pandoc conversion for SearchResultItem ID: {result_id}: {str(e)}")
  29. return {"result_id": result_id, "status": "failed"}
  30. @current_app.task(name='html_convert_tasks.convert_all_results')
  31. def convert_all_results_task(input_params: ConvertTaskParams=None):
  32. """
  33. Celery task to convert all SearchResultItems using Pandoc.
  34. Returns:
  35. dict: Task result status.
  36. """
  37. try:
  38. logger.info("Starting Pandoc conversion for all SearchResultItems")
  39. test_task_process_all_results()
  40. logger.info("Pandoc conversion completed for all SearchResultItems")
  41. return {"status": "completed"}
  42. except Exception as e:
  43. logger.exception(f"Error during bulk Pandoc conversion: {str(e)}")
  44. return {"status": "failed", "error": str(e)}
  45. def test_task_process_all_results():
  46. # Process all valid results using ValidSearchResult
  47. valid_search = ValidSearchResult()
  48. valid_items = valid_search.get_valid_search_result_items()
  49. logger.info(f"Total valid results: {len(valid_items)}")
  50. logger.info(f"First 5 valid result IDs: {[item.id for item in valid_items[:5]]}")
  51. for item in valid_items:
  52. try:
  53. if item.html_path and item.html_path.endswith('.html'):
  54. logger.info(f"Submitting task for valid SearchResultItem ID: {item.id}")
  55. convert_single_result_task.delay(item.id)
  56. except Exception as e:
  57. logger.error(f"Error processing valid result {item.id}: {e}")
  58. def clear_existing_tasks():
  59. """清除所有待处理的任务"""
  60. try:
  61. discarded_count = current_app.control.discard_all()
  62. logger.info(f"已清除 {discarded_count} 个待处理任务")
  63. except Exception as e:
  64. logger.error(f"清除任务失败: {str(e)}")
  65. def main():
  66. # test_task_process_all_results()
  67. clear_existing_tasks()
  68. pass
  69. if __name__ == "__main__":
  70. main()