| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- from celery import current_app
- from worker.html_convert.pandoc import process_single_example, process_all_results
- from mylib.logu import get_logger
- from worker.search_engine.search_result_db import SearchResultItem, SearchResultManager
- from sqlmodel import Session, select
- from worker.search_engine.valid_google_search import ValidSearchResult
- logger = get_logger('pandoc_tasks')
- @current_app.task(name='html_convert_tasks.convert_single_result')
- def convert_single_result_task(result_id: int):
- """
- Celery task to convert a single SearchResultItem using Pandoc.
-
- Args:
- result_id (int): The ID of the SearchResultItem to process.
-
- Returns:
- dict: Task result status.
- """
- try:
- logger.info(f"Starting Pandoc conversion for SearchResultItem ID: {result_id}")
- process_single_example(result_id)
- logger.info(f"Pandoc conversion completed for SearchResultItem ID: {result_id}")
- return {"result_id": result_id, "status": "completed"}
- except Exception as e:
- logger.exception(f"Error during Pandoc conversion for SearchResultItem ID: {result_id}: {str(e)}")
- return {"result_id": result_id, "status": "failed"}
- @current_app.task(name='html_convert_tasks.convert_all_results')
- def convert_all_results_task():
- """
- Celery task to convert all SearchResultItems using Pandoc.
-
- Returns:
- dict: Task result status.
- """
- try:
- logger.info("Starting Pandoc conversion for all SearchResultItems")
- process_all_results()
- logger.info("Pandoc conversion completed for all SearchResultItems")
- return {"status": "completed"}
- except Exception as e:
- logger.exception(f"Error during bulk Pandoc conversion: {str(e)}")
- return {"status": "failed", "error": str(e)}
- def test_task_process_all_results():
- # Process all valid results using ValidSearchResult
- valid_search = ValidSearchResult()
- valid_items = valid_search.get_valid_search_result_items()
-
- logger.info(f"Total valid results: {len(valid_items)}")
- logger.info(f"First 5 valid result IDs: {[item.id for item in valid_items[:5]]}")
-
- for item in valid_items:
- try:
- if item.html_path and item.html_path.endswith('.html'):
- logger.info(f"Submitting task for valid SearchResultItem ID: {item.id}")
- convert_single_result_task.delay(item.id)
- except Exception as e:
- logger.error(f"Error processing valid result {item.id}: {e}")
- def clear_existing_tasks():
- """清除所有待处理的任务"""
- try:
- discarded_count = current_app.control.discard_all()
- logger.info(f"已清除 {discarded_count} 个待处理任务")
- except Exception as e:
- logger.error(f"清除任务失败: {str(e)}")
- def main():
- # test_task_process_all_results()
- clear_existing_tasks()
- pass
- if __name__ == "__main__":
- main()
|