translate_utils.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. import os
  2. import logging
  3. from pathlib import Path
  4. from typing import List, Tuple
  5. from mylib.pdfzh_translator import OpenAITranslator
  6. from mylib.read_encoding_cvs import read_csv
  7. from logging_config import setup_logging
  8. from mylib.logging_config import setup_logging
  9. # Setup custom logging
  10. setup_logging()
  11. logger = logging.getLogger(__name__)
  12. def insert_empty_columns(data: List[List[str]], column_indices: List[int]) -> List[List[str]]:
  13. """在指定列之后插入空列"""
  14. try:
  15. # 按从大到小排序,防止插入影响后续索引
  16. column_indices.sort(reverse=True)
  17. for row in data:
  18. for index in column_indices:
  19. row.insert(index + 1, '')
  20. return data
  21. except Exception as e:
  22. logger.error(f"Error inserting empty columns: {e}")
  23. raise
  24. def extract_sample_data(data: List[List[str]], n: int = 2, m: int = 2) -> List[List[str]]:
  25. """提取前n行m列数据用于检查"""
  26. try:
  27. sample = []
  28. for row in data[:n]:
  29. sample.append(row[:m])
  30. return sample
  31. except Exception as e:
  32. logger.error(f"Error extracting sample data: {e}")
  33. raise
  34. def log_data_details(data: List[List[str]], search_term_index: int, start_row: int = 3):
  35. """记录数据详细信息"""
  36. try:
  37. # 记录开始行信息
  38. logger.info(f"开始行: {start_row}")
  39. # 记录前start_row - 5行数据
  40. logger.info("前5行数据:")
  41. for i, row in enumerate(data[start_row:start_row+5]):
  42. logger.info(f"行 {i}: {', '.join(row)}")
  43. # 记录最后5行数据
  44. logger.info("最后5行数据:")
  45. for i, row in enumerate(data[-5:]):
  46. logger.info(f"行 {len(data)-5+i}: {', '.join(row)}")
  47. except Exception as e:
  48. logger.error(f"记录数据详细信息时出错: {e}")
  49. raise
  50. def process_batch_translations(data: List[List[str]],
  51. search_term_index: int,
  52. start_row: int = 3) -> Tuple[List[List[str]], List[List[str]]]:
  53. """批量处理搜索词翻译"""
  54. try:
  55. # 首先提取样本数据用于检查
  56. sample_data = extract_sample_data(data)
  57. logger.info(f"Sample data extracted for inspection:\n{sample_data}")
  58. # 记录数据详细信息
  59. log_data_details(data, search_term_index, start_row)
  60. # 初始化翻译器
  61. translator = OpenAITranslator()
  62. # 直接提取需要翻译的搜索词
  63. search_terms = [row[search_term_index] for row in data[start_row-1:]]
  64. # 批量翻译
  65. logger.info("Starting search term translations...")
  66. if os.getenv('DEBUG', '').lower() in ('true', '1', 'True'):
  67. # DEBUG模式:使用模拟翻译
  68. search_translations = [f"{text} 翻译测试" for text in search_terms]
  69. else:
  70. # 正常模式:调用真实翻译
  71. search_translations = translator.translate(search_terms)
  72. logger.info("Search term translations completed")
  73. # 更新数据
  74. for i, row in enumerate(data[start_row-1:], start=start_row-1):
  75. try:
  76. # 更新搜索词翻译列
  77. row[search_term_index + 1] = search_translations[i-(start_row-1)]
  78. except Exception as e:
  79. logger.error(f"Error processing row {i}: {e}")
  80. raise
  81. return data, sample_data
  82. except Exception as e:
  83. logger.error(f"Error in batch translation: {e}")
  84. raise
  85. def main():
  86. output_dir = Path('temp')
  87. input_file = output_dir/"测试.csv"
  88. output_file = output_dir/"processed_测试.csv"
  89. data = read_csv(input_file)
  90. process_batch_translations(data, 2)
  91. if __name__ == "__main__":
  92. main()