| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- import csv
- import logging
- from typing import List, Optional, Union
- from mylib.logging_config import setup_logging
- from mylib.pdfzh_translator import OpenAITranslator
- # Setup custom logging
- setup_logging()
- logger = logging.getLogger('new_col_translate')
- def column_letter_to_index(col_letter: str) -> int:
- """将Excel列字母转换为0-based索引"""
- index = 0
- for char in col_letter.upper():
- index = index * 26 + (ord(char) - ord('A') + 1)
- return index - 1
- def read_csv_with_header(file_path: str, encoding: str = 'cp936') -> List[List[str]]:
- """读取CSV文件并返回数据和表头"""
- try:
- with open(file_path, 'r', encoding=encoding) as f:
- reader = csv.reader(f)
- header = next(reader)
- data = [row for row in reader]
- logger.info(f"成功读取文件:{file_path}")
- logger.debug(f"表头:{header}")
- return header, data
- except Exception as e:
- logger.error(f"读取文件失败:{e}")
- raise
- def translate_columns_data(
- data: List[List[str]],
- header: List[str],
- column_indices: List[int],
- start_row: int = 1,
- end_row: Optional[int] = None,
- source_lang: str = 'auto',
- target_lang: str = 'zh-CN'
- ) -> List[List[str]]:
- """翻译多个指定列的数据"""
- # 记录用户传入的参数
- logger.info(f"翻译参数:源语言={source_lang}, 目标语言={target_lang}")
- logger.info(f"翻译范围:从第 {start_row} 行到第 {end_row if end_row else '最后'} 行")
-
- translator = OpenAITranslator(lang_out=target_lang, lang_in=source_lang)
-
- end_row = end_row if end_row is not None else len(data)
- rows_to_translate = data[start_row:end_row]
-
- logger.info(f"开始翻译 {start_row} 到 {end_row} 行的数据")
-
- # 按顺序处理每一列
- for i, col_index in enumerate(column_indices):
- # 计算当前列的实际索引(考虑之前插入的列)
- current_col_index = col_index + i
-
- # 插入新列
- for row in data:
- row.insert(current_col_index + 1, '')
-
- # 更新表头
- header.insert(current_col_index + 1, f"{header[current_col_index]}_translated")
-
- # 提取要翻译的文本
- texts_to_translate = [row[current_col_index] for row in rows_to_translate]
-
- # 在翻译前log出提取的内容
- logger.info(f"列 {current_col_index} 提取的内容示例:")
- for idx, text in enumerate(texts_to_translate[:3], start=start_row):
- logger.info(f"第 {idx} 行: {text}")
-
- # 批量翻译
- translated_texts = translator._batch_translate(texts_to_translate)
-
- # 将翻译结果插入新列
- for j, row in enumerate(rows_to_translate):
- row[current_col_index + 1] = translated_texts[j]
-
- logger.info(f"列 {current_col_index} 翻译完成")
-
- logger.info("所有列翻译完成")
- return data, header
- def save_csv(
- data: List[List[str]],
- header: List[str],
- output_file: str,
- encoding: str = 'utf-8-sig'
- ):
- """保存CSV文件"""
- try:
- with open(output_file, 'w', encoding=encoding, newline='') as f:
- writer = csv.writer(f)
- writer.writerow(header)
- writer.writerows(data)
- logger.info(f"结果已保存到: {output_file}")
- except Exception as e:
- logger.error(f"保存文件失败:{e}")
- raise
- def process_csv(
- input_file: str,
- output_file: str,
- columns: Union[str, List[str]],
- start_row: int = 1,
- end_row: Optional[int] = None,
- source_lang: str = 'auto',
- target_lang: str = 'zh-CN',
- encoding: str = 'cp936'
- ):
- """处理CSV文件的主函数"""
- try:
- # 记录用户传入的参数
- logger.info(f"处理文件:{input_file}")
- logger.info(f"输出文件:{output_file}")
- logger.info(f"处理列:{columns}")
- logger.info(f"编码:{encoding}")
-
- # 转换列字母为索引
- if isinstance(columns, str):
- columns = [columns]
- column_indices = [column_letter_to_index(col) for col in columns]
-
- # 读取文件
- header, data = read_csv_with_header(input_file, encoding=encoding)
-
- # 翻译指定列
- data, header = translate_columns_data(
- data,
- header,
- column_indices,
- start_row,
- end_row,
- source_lang,
- target_lang
- )
-
- # 保存结果
- save_csv(data, header, output_file)
-
- except Exception as e:
- logger.error(f"处理文件时出错:{e}")
- raise
- if __name__ == "__main__":
- from dotenv import load_dotenv
- load_dotenv()
- # 示例用法
- file_path = "/home/mrh/code/excel_tool/temp/测试.csv"
- output_path = "/home/mrh/code/excel_tool/temp/测试_processed.csv"
- process_csv(
- input_file=file_path,
- output_file=output_path,
- columns=['B', 'F', 'G', 'H'], # 现在支持多个列
- start_row=1,
- source_lang='auto',
- target_lang='zh-CN'
- )
|