html_convert_tasks.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. from celery import current_app
  2. from worker.html_convert.pandoc import process_single_example, process_all_results
  3. from mylib.logu import get_logger
  4. from worker.search_engine.search_result_db import SearchResultItem, SearchResultManager
  5. from sqlmodel import Session, select
  6. from worker.search_engine.valid_google_search import ValidSearchResult
  7. logger = get_logger('pandoc_tasks')
  8. @current_app.task(name='html_convert_tasks.convert_single_result')
  9. def convert_single_result_task(result_id: int):
  10. """
  11. Celery task to convert a single SearchResultItem using Pandoc.
  12. Args:
  13. result_id (int): The ID of the SearchResultItem to process.
  14. Returns:
  15. dict: Task result status.
  16. """
  17. try:
  18. logger.info(f"Starting Pandoc conversion for SearchResultItem ID: {result_id}")
  19. process_single_example(result_id)
  20. logger.info(f"Pandoc conversion completed for SearchResultItem ID: {result_id}")
  21. return {"result_id": result_id, "status": "completed"}
  22. except Exception as e:
  23. logger.exception(f"Error during Pandoc conversion for SearchResultItem ID: {result_id}: {str(e)}")
  24. return {"result_id": result_id, "status": "failed"}
  25. @current_app.task(name='html_convert_tasks.convert_all_results')
  26. def convert_all_results_task():
  27. """
  28. Celery task to convert all SearchResultItems using Pandoc.
  29. Returns:
  30. dict: Task result status.
  31. """
  32. try:
  33. logger.info("Starting Pandoc conversion for all SearchResultItems")
  34. process_all_results()
  35. logger.info("Pandoc conversion completed for all SearchResultItems")
  36. return {"status": "completed"}
  37. except Exception as e:
  38. logger.exception(f"Error during bulk Pandoc conversion: {str(e)}")
  39. return {"status": "failed", "error": str(e)}
  40. def test_task_process_all_results():
  41. # Process all valid results using ValidSearchResult
  42. valid_search = ValidSearchResult()
  43. valid_items = valid_search.get_valid_search_result_items()
  44. logger.info(f"Total valid results: {len(valid_items)}")
  45. logger.info(f"First 5 valid result IDs: {[item.id for item in valid_items[:5]]}")
  46. for item in valid_items:
  47. try:
  48. if item.html_path and item.html_path.endswith('.html'):
  49. logger.info(f"Submitting task for valid SearchResultItem ID: {item.id}")
  50. convert_single_result_task.delay(item.id)
  51. except Exception as e:
  52. logger.error(f"Error processing valid result {item.id}: {e}")
  53. def clear_existing_tasks():
  54. """清除所有待处理的任务"""
  55. try:
  56. discarded_count = current_app.control.discard_all()
  57. logger.info(f"已清除 {discarded_count} 个待处理任务")
  58. except Exception as e:
  59. logger.error(f"清除任务失败: {str(e)}")
  60. def main():
  61. # test_task_process_all_results()
  62. clear_existing_tasks()
  63. pass
  64. if __name__ == "__main__":
  65. main()