new_col_translate.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. import csv
  2. import logging
  3. from typing import List, Optional
  4. from mylib.logging_config import setup_logging
  5. from mylib.pdfzh_translator import OpenAITranslator
  6. # Setup custom logging
  7. setup_logging()
  8. logger = logging.getLogger('new_col_translate')
  9. def column_letter_to_index(col_letter: str) -> int:
  10. """将Excel列字母转换为0-based索引"""
  11. index = 0
  12. for char in col_letter.upper():
  13. index = index * 26 + (ord(char) - ord('A') + 1)
  14. return index - 1
  15. def read_csv_with_header(file_path: str, encoding: str = 'cp936') -> List[List[str]]:
  16. """读取CSV文件并返回数据和表头"""
  17. try:
  18. with open(file_path, 'r', encoding=encoding) as f:
  19. reader = csv.reader(f)
  20. header = next(reader)
  21. data = [row for row in reader]
  22. logger.info(f"成功读取文件:{file_path}")
  23. logger.debug(f"表头:{header}")
  24. return header, data
  25. except Exception as e:
  26. logger.error(f"读取文件失败:{e}")
  27. raise
  28. def translate_column_data(
  29. data: List[List[str]],
  30. column_index: int,
  31. start_row: int = 1,
  32. end_row: Optional[int] = None,
  33. source_lang: str = 'auto',
  34. target_lang: str = 'zh-CN'
  35. ) -> List[List[str]]:
  36. """翻译指定列的数据"""
  37. translator = OpenAITranslator(lang_out=target_lang, lang_in=source_lang)
  38. end_row = end_row if end_row is not None else len(data)
  39. rows_to_translate = data[start_row:end_row]
  40. logger.info(f"开始翻译 {start_row} 到 {end_row} 行的数据")
  41. # 提取要翻译的文本
  42. texts_to_translate = [row[column_index] for row in rows_to_translate]
  43. logger.debug(f"待翻译文本示例:{texts_to_translate[:3]}")
  44. # 批量翻译
  45. translated_texts = translator._batch_translate(texts_to_translate)
  46. # 将翻译结果插入新列
  47. for i, row in enumerate(rows_to_translate):
  48. row.insert(column_index + 1, translated_texts[i])
  49. logger.info("翻译完成")
  50. return data
  51. def process_csv(
  52. input_file: str,
  53. output_file: str,
  54. column: str,
  55. start_row: int = 1,
  56. end_row: Optional[int] = None,
  57. source_lang: str = 'auto',
  58. target_lang: str = 'zh-CN',
  59. encoding: str = 'cp936'
  60. ):
  61. """处理CSV文件的主函数"""
  62. try:
  63. # 转换列字母为索引
  64. column_index = column_letter_to_index(column)
  65. # 读取文件
  66. header, data = read_csv_with_header(input_file, encoding=encoding)
  67. # 插入空列
  68. for row in data:
  69. row.insert(column_index + 1, '')
  70. # 翻译指定列
  71. data = translate_column_data(
  72. data,
  73. column_index,
  74. start_row,
  75. end_row,
  76. source_lang,
  77. target_lang
  78. )
  79. # 保存结果
  80. with open(output_file, 'w', encoding='utf-8-sig', newline='') as f:
  81. writer = csv.writer(f)
  82. writer.writerow(header)
  83. writer.writerows(data)
  84. logger.info(f"结果已保存到: {output_file}")
  85. except Exception as e:
  86. logger.error(f"处理文件时出错:{e}")
  87. raise
  88. if __name__ == "__main__":
  89. # 示例用法
  90. file_path = "/home/mrh/code/excel_tool/temp/测试.csv"
  91. output_path = "/home/mrh/code/excel_tool/temp/测试_processed.csv"
  92. process_csv(
  93. input_file=file_path,
  94. output_file=output_path,
  95. column='B',
  96. start_row=1,
  97. source_lang='auto',
  98. target_lang='zh-CN'
  99. )