import csv import logging from typing import List, Optional from mylib.logging_config import setup_logging from mylib.pdfzh_translator import OpenAITranslator # Setup custom logging setup_logging() logger = logging.getLogger('new_col_translate') def column_letter_to_index(col_letter: str) -> int: """将Excel列字母转换为0-based索引""" index = 0 for char in col_letter.upper(): index = index * 26 + (ord(char) - ord('A') + 1) return index - 1 def read_csv_with_header(file_path: str, encoding: str = 'cp936') -> List[List[str]]: """读取CSV文件并返回数据和表头""" try: with open(file_path, 'r', encoding=encoding) as f: reader = csv.reader(f) header = next(reader) data = [row for row in reader] logger.info(f"成功读取文件:{file_path}") logger.debug(f"表头:{header}") return header, data except Exception as e: logger.error(f"读取文件失败:{e}") raise def translate_column_data( data: List[List[str]], column_index: int, start_row: int = 1, end_row: Optional[int] = None, source_lang: str = 'auto', target_lang: str = 'zh-CN' ) -> List[List[str]]: """翻译指定列的数据""" translator = OpenAITranslator(lang_out=target_lang, lang_in=source_lang) end_row = end_row if end_row is not None else len(data) rows_to_translate = data[start_row:end_row] logger.info(f"开始翻译 {start_row} 到 {end_row} 行的数据") # 提取要翻译的文本 texts_to_translate = [row[column_index] for row in rows_to_translate] logger.debug(f"待翻译文本示例:{texts_to_translate[:3]}") # 批量翻译 translated_texts = translator._batch_translate(texts_to_translate) # 将翻译结果插入新列 for i, row in enumerate(rows_to_translate): row.insert(column_index + 1, translated_texts[i]) logger.info("翻译完成") return data def process_csv( input_file: str, output_file: str, column: str, start_row: int = 1, end_row: Optional[int] = None, source_lang: str = 'auto', target_lang: str = 'zh-CN', encoding: str = 'cp936' ): """处理CSV文件的主函数""" try: # 转换列字母为索引 column_index = column_letter_to_index(column) # 读取文件 header, data = read_csv_with_header(input_file, encoding=encoding) # 插入空列 for row in data: row.insert(column_index + 1, '') # 翻译指定列 data = translate_column_data( data, column_index, start_row, end_row, source_lang, target_lang ) # 保存结果 with open(output_file, 'w', encoding='utf-8-sig', newline='') as f: writer = csv.writer(f) writer.writerow(header) writer.writerows(data) logger.info(f"结果已保存到:{output_file}") except Exception as e: logger.error(f"处理文件时出错:{e}") raise if __name__ == "__main__": # 示例用法 process_csv( input_file='input.csv', output_file='output.csv', column='B', start_row=1, source_lang='auto', target_lang='zh-CN' )