from celery import current_app
from worker.html_convert.pandoc import process_single_example, process_all_results
from mylib.logu import get_logger
from worker.search_engine.search_result_db import SearchResultItem, SearchResultManager
from sqlmodel import Session, select
from worker.search_engine.valid_google_search import ValidSearchResult
logger = get_logger('pandoc_tasks')
@current_app.task(name='html_convert_tasks.convert_single_result')
def convert_single_result_task(result_id: int):
"""
Celery task to convert a single SearchResultItem using Pandoc.
Args:
result_id (int): The ID of the SearchResultItem to process.
Returns:
dict: Task result status.
"""
try:
logger.info(f"Starting Pandoc conversion for SearchResultItem ID: {result_id}")
process_single_example(result_id)
logger.info(f"Pandoc conversion completed for SearchResultItem ID: {result_id}")
return {"result_id": result_id, "status": "completed"}
except Exception as e:
logger.exception(f"Error during Pandoc conversion for SearchResultItem ID: {result_id}: {str(e)}")
return {"result_id": result_id, "status": "failed"}
@current_app.task(name='html_convert_tasks.convert_all_results')
def convert_all_results_task():
"""
Celery task to convert all SearchResultItems using Pandoc.
Returns:
dict: Task result status.
"""
try:
logger.info("Starting Pandoc conversion for all SearchResultItems")
process_all_results()
logger.info("Pandoc conversion completed for all SearchResultItems")
return {"status": "completed"}
except Exception as e:
logger.exception(f"Error during bulk Pandoc conversion: {str(e)}")
return {"status": "failed", "error": str(e)}
def test_task_process_all_results():
# Process all valid results using ValidSearchResult
valid_search = ValidSearchResult()
valid_items = valid_search.get_valid_search_result_items()
logger.info(f"Total valid results: {len(valid_items)}")
logger.info(f"First 5 valid result IDs: {[item.id for item in valid_items[:5]]}")
for item in valid_items:
try:
if item.html_path and item.html_path.endswith('.html'):
logger.info(f"Submitting task for valid SearchResultItem ID: {item.id}")
convert_single_result_task.delay(item.id)
except Exception as e:
logger.error(f"Error processing valid result {item.id}: {e}")
def clear_existing_tasks():
"""清除所有待处理的任务"""
try:
discarded_count = current_app.control.discard_all()
logger.info(f"已清除 {discarded_count} 个待处理任务")
except Exception as e:
logger.error(f"清除任务失败: {str(e)}")
def main():
# test_task_process_all_results()
clear_existing_tasks()
pass
if __name__ == "__main__":
main()