| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- import os
- import logging
- import pandas as pd
- from pathlib import Path
- from typing import List, Tuple, Union
- from mylib.pdfzh_translator import OpenAITranslator
- from mylib.read_encoding_cvs import read_csv
- from mylib.logging_config import setup_logging
- # Setup custom logging
- setup_logging()
- logger = logging.getLogger('mylib.translate_utils')
- def column_letter_to_index(col_letter: str) -> int:
- """将列字母转换为列索引(从0开始)
-
- Args:
- col_letter: 列字母(如 'A', 'B', 'AA' 等)
-
- Returns:
- 列索引(从0开始)
- """
- try:
- col_index = 0
- for i, char in enumerate(reversed(col_letter.upper())):
- col_index += (ord(char) - ord('A') + 1) * (26 ** i)
- return col_index - 1
- except Exception as e:
- logger.error(f"列字母转换时出错: {e}")
- raise
- def read_csv_with_header(file_path: str, header_row: int = 1, encoding: str = None) -> pd.DataFrame:
- """读取CSV文件并正确处理标题行
-
- Args:
- file_path: CSV文件路径
- header_row: 标题行号(从0开始),默认为1(第2行)
- encoding: 文件编码
-
- Returns:
- pandas DataFrame
- """
- try:
- if not os.path.exists(file_path):
- logger.error(f"文件不存在: {file_path}")
- raise FileNotFoundError(f"文件不存在: {file_path}")
-
- # 读取所有数据
- data = read_csv(file_path, encoding)
-
- if not data:
- logger.error("读取的文件为空")
- raise ValueError("读取的文件为空")
-
- # 确保header_row在有效范围内
- if header_row >= len(data):
- logger.error(f"标题行 {header_row} 超出文件范围")
- raise ValueError(f"标题行 {header_row} 超出文件范围")
-
- # 使用指定行作为列名,前面的行丢弃
- df = pd.DataFrame(data[header_row+1:], columns=data[header_row])
-
- logger.info(f"成功读取CSV文件,使用第{header_row+1}行作为标题行")
- logger.info(f"列标题: {df.columns.tolist()}")
- return df
- except Exception as e:
- logger.error(f"读取CSV文件时出错: {e}")
- raise
- def translate_column_data(df: pd.DataFrame, column_identifier: Union[str, int],
- start_row: int = 1, end_row: int = None,
- source_lang: str = 'auto', target_lang: str = 'zh-CN') -> pd.DataFrame:
- """翻译指定列的数据并在右侧插入翻译结果列
-
- Args:
- df: pandas DataFrame
- column_identifier: 要翻译的列名或列号(从0开始),也可以是列字母(如 'A', 'B')
- start_row: 开始翻译的行号,默认为1(第2行)
- end_row: 结束翻译的行号,默认为None(到最后一行)
- source_lang: 源语言代码,默认为'auto'
- target_lang: 目标语言代码,默认为'zh-CN'
-
- Returns:
- 包含翻译结果的DataFrame
- """
- try:
- if df.empty:
- logger.error("DataFrame为空")
- return df
-
- # 处理列号或列名或列字母
- if isinstance(column_identifier, str) and column_identifier.isalpha():
- column_identifier = column_letter_to_index(column_identifier)
- if isinstance(column_identifier, int):
- if column_identifier < 0 or column_identifier >= len(df.columns):
- logger.error(f"列号 {column_identifier} 超出范围")
- raise ValueError(f"列号 {column_identifier} 超出范围")
- column_identifier = df.columns[column_identifier]
-
- # 确保列名存在
- if column_identifier not in df.columns:
- logger.error(f"列名 {column_identifier} 不存在")
- raise ValueError(f"列名 {column_identifier} 不存在")
-
- # 处理行范围
- if end_row is None:
- end_row = len(df)
- if start_row < 0 or start_row >= len(df) or end_row < 0 or end_row > len(df):
- logger.error(f"行范围 {start_row}-{end_row} 超出范围")
- raise ValueError(f"行范围 {start_row}-{end_row} 超出范围")
-
- # 提取要翻译的数据
- texts_to_translate = df.iloc[start_row:end_row][column_identifier].tolist()
- logger.info(f"准备翻译 {len(texts_to_translate)} 条数据,从第{start_row}行到第{end_row}行")
-
- # 初始化翻译器
- translator = OpenAITranslator(lang_out=target_lang, lang_in=source_lang)
-
- # 执行翻译
- translated_texts = translator._batch_translate(texts_to_translate)
-
- # 在右侧插入新列
- new_column_name = f"{column_identifier}_translated"
- df.insert(df.columns.get_loc(column_identifier) + 1, new_column_name, "")
-
- # 填充翻译结果
- df.loc[start_row:end_row-1, new_column_name] = translated_texts
-
- logger.info(f"翻译完成,已插入新列 {new_column_name}")
- return df
-
- except Exception as e:
- logger.error(f"翻译列数据时出错: {e}")
- raise
- def process_csv(input_file: str, output_file: str, column_identifier: Union[str, int],
- start_row: int = 1, end_row: int = None,
- source_lang: str = 'auto', target_lang: str = 'zh-CN'):
- """处理CSV文件并保存翻译结果
-
- Args:
- input_file: 输入CSV文件路径
- output_file: 输出CSV文件路径
- column_identifier: 要翻译的列名或列号(从0开始),也可以是列字母(如 'A', 'B')
- start_row: 开始翻译的行号,默认为1(第2行)
- end_row: 结束翻译的行号,默认为None(到最后一行)
- source_lang: 源语言代码,默认为'auto'
- target_lang: 目标语言代码,默认为'zh-CN'
- """
- try:
- # 读取CSV文件
- df = read_csv_with_header(input_file)
-
- # 翻译指定列
- df = translate_column_data(df, column_identifier, start_row, end_row, source_lang, target_lang)
-
- # 保存结果
- df.to_csv(output_file, index=False, encoding='utf-8-sig')
- logger.info(f"翻译结果已保存到 {output_file}")
-
- except Exception as e:
- logger.error(f"处理CSV文件时出错: {e}")
- raise
- if __name__ == '__main__':
- # 示例用法
- input_file = Path('/path/to/input.csv')
- output_file = Path('/path/to/output.csv')
- process_csv(input_file, output_file, column_identifier='B', start_row=1, end_row=10)
|