new_col_translate.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. import csv
  2. import logging
  3. from typing import List, Optional, Union
  4. from mylib.logging_config import setup_logging
  5. from mylib.pdfzh_translator import OpenAITranslator
  6. # Setup custom logging
  7. setup_logging()
  8. logger = logging.getLogger('mylib')
  9. def column_letter_to_index(col_letter: str) -> int:
  10. """将Excel列字母转换为0-based索引"""
  11. index = 0
  12. for char in col_letter.upper():
  13. index = index * 26 + (ord(char) - ord('A') + 1)
  14. return index - 1
  15. def index_to_column_letter(index: int) -> str:
  16. """将0-based索引转换为Excel列字母"""
  17. col_letter = ''
  18. while index >= 0:
  19. col_letter = chr(ord('A') + (index % 26)) + col_letter
  20. index = (index // 26) - 1
  21. return col_letter
  22. def read_csv_with_header(
  23. file_path: str,
  24. encoding: str = 'cp936'
  25. ) -> List[List[str]]:
  26. """读取CSV文件并返回数据和表头"""
  27. try:
  28. with open(file_path, 'r', encoding=encoding) as f:
  29. reader = csv.reader(f)
  30. header = next(reader) # 读取表头(第1行)
  31. data = [row for row in reader] # 读取剩余数据
  32. logger.info(f"成功读取文件:{file_path}")
  33. logger.debug(f"表头:{header}")
  34. return header, data
  35. except Exception as e:
  36. logger.error(f"读取文件失败:{e}")
  37. raise
  38. def search_keywords(
  39. data: List[List[str]],
  40. header: List[str],
  41. keywords: Union[str, List[str]],
  42. row_index: int = 0
  43. ) -> List[str]:
  44. """搜索指定行中包含关键词的单元格并返回列名列表"""
  45. if isinstance(keywords, str):
  46. keywords = [keywords]
  47. found_columns = set()
  48. # 检查行索引是否在数据范围内
  49. if row_index >= len(data):
  50. logger.warning(f"行索引 {row_index} 超出数据范围")
  51. return []
  52. # 搜索数据行(从第2行开始)
  53. row = data[row_index]
  54. for col_index, cell in enumerate(row):
  55. if any(keyword in cell for keyword in keywords):
  56. col_letter = index_to_column_letter(col_index)
  57. found_columns.add(col_letter)
  58. logger.debug(f"在 {col_letter}{row_index + 2} 找到关键词: {cell}")
  59. found_columns = sorted(found_columns, key=lambda x: column_letter_to_index(x))
  60. logger.info(f"找到包含关键词的列: {', '.join(found_columns)}")
  61. return list(found_columns)
  62. def translate_columns_data(
  63. data: List[List[str]],
  64. header: List[str],
  65. column_indices: List[int],
  66. start_row: int = 2, # 默认从第2行开始
  67. end_row: Optional[int] = None,
  68. source_lang: str = 'auto',
  69. target_lang: str = 'zh-CN'
  70. ) -> List[List[str]]:
  71. """翻译多个指定列的数据"""
  72. translator = OpenAITranslator(lang_out=target_lang, lang_in=source_lang)
  73. end_row = end_row if end_row is not None else len(data)
  74. rows_to_translate = data[start_row - 1:end_row] # 转换为0-based索引
  75. # 按顺序处理每一列
  76. for i, col_index in enumerate(column_indices):
  77. # 计算当前列的实际索引
  78. current_col_index = col_index + i
  79. # 插入新列
  80. for row in data:
  81. row.insert(current_col_index + 1, '')
  82. # 更新表头
  83. header.insert(current_col_index + 1, f"{header[current_col_index]}_translated")
  84. # 提取要翻译的文本
  85. texts_to_translate = [row[current_col_index] for row in rows_to_translate]
  86. # 批量翻译
  87. translated_texts = translator._batch_translate(texts_to_translate)
  88. # 将翻译结果插入新列
  89. for j, row in enumerate(rows_to_translate):
  90. row[current_col_index + 1] = translated_texts[j]
  91. return data, header
  92. def save_csv(
  93. data: List[List[str]],
  94. header: List[str],
  95. output_file: str,
  96. encoding: str = 'utf-8-sig'
  97. ):
  98. """保存CSV文件"""
  99. try:
  100. with open(output_file, 'w', encoding=encoding, newline='') as f:
  101. writer = csv.writer(f)
  102. writer.writerow(header)
  103. writer.writerows(data)
  104. logger.info(f"结果已保存到: {output_file}")
  105. except Exception as e:
  106. logger.error(f"保存文件失败:{e}")
  107. raise
  108. def process_csv(
  109. input_file: str,
  110. output_file: str,
  111. columns: Union[str, List[str]],
  112. start_row: int = 2, # 默认从第2行开始
  113. end_row: Optional[int] = None,
  114. source_lang: str = 'auto',
  115. target_lang: str = 'zh-CN',
  116. encoding: str = 'cp936'
  117. ):
  118. """处理CSV文件的主函数"""
  119. try:
  120. # 转换列字母为索引
  121. if isinstance(columns, str):
  122. columns = [columns]
  123. column_indices = [column_letter_to_index(col) for col in columns]
  124. # 读取文件
  125. header, data = read_csv_with_header(input_file, encoding=encoding)
  126. # 翻译指定列
  127. data, header = translate_columns_data(
  128. data,
  129. header,
  130. column_indices,
  131. start_row,
  132. end_row,
  133. source_lang,
  134. target_lang
  135. )
  136. # 保存结果
  137. save_csv(data, header, output_file)
  138. except Exception as e:
  139. logger.error(f"处理文件时出错:{e}")
  140. raise
  141. if __name__ == "__main__":
  142. from dotenv import load_dotenv
  143. load_dotenv()
  144. # 示例用法
  145. file_path = "/home/mrh/code/excel_tool/temp/测试.csv"
  146. output_path = "/home/mrh/code/excel_tool/temp/测试_processed.csv"
  147. # 读取文件并搜索关键词
  148. header, data = read_csv_with_header(file_path)
  149. found_columns = search_keywords(data, header, ["搜索词", "类别"])
  150. # 处理文件
  151. process_csv(
  152. input_file=file_path,
  153. output_file=output_path,
  154. columns=found_columns,
  155. start_row=2,
  156. source_lang='auto',
  157. target_lang='zh-CN'
  158. )