from celery import current_app from worker.html_convert.pandoc import process_single_example, process_all_results from mylib.logu import get_logger from worker.search_engine.search_result_db import SearchResultItem, SearchResultManager from sqlmodel import Session, select from worker.search_engine.valid_google_search import ValidSearchResult logger = get_logger('pandoc_tasks') @current_app.task(name='html_convert_tasks.convert_single_result') def convert_single_result_task(result_id: int): """ Celery task to convert a single SearchResultItem using Pandoc. Args: result_id (int): The ID of the SearchResultItem to process. Returns: dict: Task result status. """ try: logger.info(f"Starting Pandoc conversion for SearchResultItem ID: {result_id}") process_single_example(result_id) logger.info(f"Pandoc conversion completed for SearchResultItem ID: {result_id}") return {"result_id": result_id, "status": "completed"} except Exception as e: logger.exception(f"Error during Pandoc conversion for SearchResultItem ID: {result_id}: {str(e)}") return {"result_id": result_id, "status": "failed"} @current_app.task(name='html_convert_tasks.convert_all_results') def convert_all_results_task(): """ Celery task to convert all SearchResultItems using Pandoc. Returns: dict: Task result status. """ try: logger.info("Starting Pandoc conversion for all SearchResultItems") process_all_results() logger.info("Pandoc conversion completed for all SearchResultItems") return {"status": "completed"} except Exception as e: logger.exception(f"Error during bulk Pandoc conversion: {str(e)}") return {"status": "failed", "error": str(e)} def test_task_process_all_results(): # Process all valid results using ValidSearchResult valid_search = ValidSearchResult() valid_items = valid_search.get_valid_search_result_items() logger.info(f"Total valid results: {len(valid_items)}") logger.info(f"First 5 valid result IDs: {[item.id for item in valid_items[:5]]}") for item in valid_items: try: if item.html_path and item.html_path.endswith('.html'): logger.info(f"Submitting task for valid SearchResultItem ID: {item.id}") convert_single_result_task.delay(item.id) except Exception as e: logger.error(f"Error processing valid result {item.id}: {e}") def clear_existing_tasks(): """清除所有待处理的任务""" try: discarded_count = current_app.control.discard_all() logger.info(f"已清除 {discarded_count} 个待处理任务") except Exception as e: logger.error(f"清除任务失败: {str(e)}") def main(): # test_task_process_all_results() clear_existing_tasks() pass if __name__ == "__main__": main()