new_col_translate.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. import csv
  2. import logging
  3. from typing import List, Optional, Union
  4. from mylib.logging_config import setup_logging
  5. from mylib.pdfzh_translator import OpenAITranslator
  6. # Setup custom logging
  7. setup_logging()
  8. logger = logging.getLogger('mylib')
  9. def column_letter_to_index(col_letter: str) -> int:
  10. """将Excel列字母转换为0-based索引"""
  11. index = 0
  12. for char in col_letter.upper():
  13. index = index * 26 + (ord(char) - ord('A') + 1)
  14. return index - 1
  15. def index_to_column_letter(index: int) -> str:
  16. """将0-based索引转换为Excel列字母"""
  17. col_letter = ''
  18. while index >= 0:
  19. col_letter = chr(ord('A') + (index % 26)) + col_letter
  20. index = (index // 26) - 1
  21. return col_letter
  22. def read_csv_with_header(
  23. file_path: str,
  24. encoding: str = 'utf-8'
  25. ) -> List[List[str]]:
  26. """读取CSV文件并返回数据和表头"""
  27. try:
  28. logger.debug(f"encoding = {encoding}")
  29. with open(file_path, 'r', encoding=encoding) as f:
  30. reader = csv.reader(f)
  31. header = next(reader) # 读取表头(第1行)
  32. data = [row for row in reader] # 读取剩余数据
  33. logger.info(f"成功读取文件:{file_path}")
  34. logger.debug(f"表头:{header}")
  35. return header, data
  36. except Exception as e:
  37. logger.error(f"读取文件失败:{e}")
  38. raise
  39. def search_keywords(
  40. data: List[List[str]],
  41. header: List[str],
  42. keywords: Union[str, List[str]],
  43. row_index: int = 0
  44. ) -> List[str]:
  45. """搜索指定行中包含关键词的单元格并返回列名列表"""
  46. if isinstance(keywords, str):
  47. keywords = [keywords]
  48. found_columns = set()
  49. # 检查行索引是否在数据范围内
  50. if row_index >= len(data):
  51. logger.warning(f"行索引 {row_index} 超出数据范围")
  52. return []
  53. # 搜索数据行(从第2行开始)
  54. row = data[row_index]
  55. for col_index, cell in enumerate(row):
  56. if any(keyword in cell for keyword in keywords):
  57. col_letter = index_to_column_letter(col_index)
  58. found_columns.add(col_letter)
  59. logger.debug(f"在 {col_letter}{row_index + 2} 找到关键词: {cell}")
  60. found_columns = sorted(found_columns, key=lambda x: column_letter_to_index(x))
  61. logger.info(f"找到包含关键词的列: {', '.join(found_columns)}")
  62. return list(found_columns)
  63. def translate_columns_data(
  64. data: List[List[str]],
  65. header: List[str],
  66. column_indices: List[int],
  67. start_row: int = 2, # 默认从第2行开始
  68. end_row: Optional[int] = None,
  69. source_lang: str = 'auto',
  70. target_lang: str = 'zh-CN'
  71. ) -> List[List[str]]:
  72. """翻译多个指定列的数据"""
  73. translator = OpenAITranslator(lang_out=target_lang, lang_in=source_lang)
  74. end_row = end_row if end_row is not None else len(data)
  75. rows_to_translate = data[start_row - 1:end_row] # 转换为0-based索引
  76. # 按顺序处理每一列
  77. for i, col_index in enumerate(column_indices):
  78. # 计算当前列的实际索引
  79. current_col_index = col_index + i
  80. # 插入新列
  81. for row in data:
  82. row.insert(current_col_index + 1, '')
  83. # 更新表头
  84. header.insert(current_col_index + 1, f"{header[current_col_index]}_translated")
  85. # 提取要翻译的文本
  86. texts_to_translate = [row[current_col_index] for row in rows_to_translate]
  87. # 批量翻译
  88. translated_texts = translator._batch_translate(texts_to_translate)
  89. # 将翻译结果插入新列
  90. for j, row in enumerate(rows_to_translate):
  91. row[current_col_index + 1] = translated_texts[j]
  92. return data, header
  93. def save_csv(
  94. data: List[List[str]],
  95. header: List[str],
  96. output_file: str,
  97. encoding: str = 'utf-8-sig'
  98. ):
  99. """保存CSV文件"""
  100. try:
  101. with open(output_file, 'w', encoding=encoding, newline='') as f:
  102. writer = csv.writer(f)
  103. writer.writerow(header)
  104. writer.writerows(data)
  105. logger.info(f"结果已保存到: {output_file}")
  106. except Exception as e:
  107. logger.error(f"保存文件失败:{e}")
  108. raise
  109. def process_csv(
  110. input_file: str,
  111. output_file: str,
  112. columns: Union[str, List[str]],
  113. start_row: int = 2, # 默认从第2行开始
  114. end_row: Optional[int] = None,
  115. source_lang: str = 'auto',
  116. target_lang: str = 'zh-CN',
  117. encoding: str = 'utf-8'
  118. ):
  119. """处理CSV文件的主函数"""
  120. try:
  121. # 转换列字母为索引
  122. if isinstance(columns, str):
  123. columns = [columns]
  124. column_indices = [column_letter_to_index(col) for col in columns]
  125. # 读取文件
  126. header, data = read_csv_with_header(input_file, encoding=encoding)
  127. # 翻译指定列
  128. data, header = translate_columns_data(
  129. data,
  130. header,
  131. column_indices,
  132. start_row,
  133. end_row,
  134. source_lang,
  135. target_lang
  136. )
  137. # 保存结果
  138. save_csv(data, header, output_file)
  139. except Exception as e:
  140. logger.error(f"处理文件时出错:{e}")
  141. raise
  142. if __name__ == "__main__":
  143. from dotenv import load_dotenv
  144. load_dotenv()
  145. # 示例用法
  146. file_path = "/home/mrh/code/excel_tool/temp/测试_url_processed.csv"
  147. output_path = "/home/mrh/code/excel_tool/temp/测试_processed.csv"
  148. # 读取文件并搜索关键词
  149. header, data = read_csv_with_header(file_path, encoding='utf-8')
  150. found_columns = search_keywords(data, header, ["搜索词", "类别"])
  151. # 处理文件
  152. process_csv(
  153. input_file=file_path,
  154. output_file=output_path,
  155. columns=found_columns,
  156. start_row=2,
  157. source_lang='auto',
  158. target_lang='zh-CN'
  159. )