| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 |
- import os
- import logging
- import pandas as pd
- from pathlib import Path
- from typing import List, Tuple, Union
- from mylib.pdfzh_translator import OpenAITranslator
- from mylib.read_encoding_cvs import read_csv
- from mylib.logging_config import setup_logging
- # Setup custom logging
- setup_logging()
- logger = logging.getLogger('mylib.translate_utils')
- def column_letter_to_index(col_letter: str) -> int:
- """将列字母转换为列索引(从0开始)
-
- Args:
- col_letter: 列字母(如 'A', 'B', 'AA' 等)
-
- Returns:
- 列索引(从0开始)
- """
- try:
- col_index = 0
- for i, char in enumerate(reversed(col_letter.upper())):
- col_index += (ord(char) - ord('A') + 1) * (26 ** i)
- return col_index - 1
- except Exception as e:
- logger.error(f"列字母转换时出错: {e}")
- raise
- def extract_column_data(df: pd.DataFrame, column_identifier: Union[str, int], start_row: int = 2, header_row: int = 1) -> pd.Series:
- """提取指定列的数据,默认从第3行开始
-
- Args:
- df: pandas DataFrame
- column_identifier: 要提取的列名或列号(从0开始),也可以是列字母(如 'A', 'B')
- start_row: 开始提取的行号,默认为2(第3行)
- header_row: 标题行号,默认为1(第2行)
-
- Returns:
- 包含指定列数据的Series
- """
- try:
- if df.empty:
- return pd.Series()
-
- # 处理列号或列名或列字母
- if isinstance(column_identifier, str) and column_identifier.isalpha():
- column_identifier = column_letter_to_index(column_identifier)
- if isinstance(column_identifier, int):
- if column_identifier < 0 or column_identifier >= len(df.columns):
- raise ValueError(f"列号 {column_identifier} 超出范围")
- column_identifier = df.columns[column_identifier]
-
- # 确保列名存在
- if column_identifier not in df.columns:
- raise ValueError(f"列名 {column_identifier} 不存在")
-
- # 确保开始行在有效范围内
- if start_row >= len(df) or start_row < 0:
- raise ValueError(f"开始行 {start_row} 超出范围")
-
- # 提取指定列的数据
- column_data = df.iloc[start_row:][column_identifier]
- logger.info(f"成功提取列 {column_identifier} 数据,从第{start_row}行开始,共{len(column_data)}条数据")
- logger.info(f"列 {column_identifier} 数据: {column_data.tolist()}")
- return column_data
-
- except Exception as e:
- logger.error(f"提取列数据时出错: {e}")
- raise
- def insert_empty_columns(df: pd.DataFrame, column_names: List[Union[str, int]], header_row: int = 1) -> pd.DataFrame:
- """在指定列之后插入空列"""
- try:
- # 按从大到小排序,防止插入影响后续索引
- column_names = sorted(column_names, reverse=True, key=lambda x: df.columns.get_loc(x) if isinstance(x, str) else x)
-
- for col in column_names:
- if isinstance(col, str) and col.isalpha():
- col = column_letter_to_index(col)
- if isinstance(col, int):
- if col < 0 or col >= len(df.columns):
- raise ValueError(f"列号 {col} 超出范围")
- col = df.columns[col]
-
- if col in df.columns:
- # 在指定列后插入空列
- new_col_index = df.columns.get_loc(col) + 1
- new_col_name = f"{col}_translated"
- df.insert(new_col_index, new_col_name, '')
-
- return df
- except Exception as e:
- logger.error(f"插入空列时出错: {e}")
- raise
- def extract_sample_data(df: pd.DataFrame, start_row: int = 0, column_name: str = None, n: int = 3, header_row: int = 1) -> pd.DataFrame:
- """提取指定行和列开始的样本数据"""
- try:
- # 确保不超过数据范围
- end_row = min(start_row + n, len(df))
-
- if column_name:
- return df.iloc[start_row:end_row][[column_name]]
- return df.iloc[start_row:end_row]
- except Exception as e:
- logger.error(f"提取样本数据时出错: {e}")
- raise
- def log_data_details(df: pd.DataFrame, search_term_col: str, start_row: int = 2, header_row: int = 1):
- """记录数据详细信息"""
- try:
- # 记录行号和列号
- logger.info(f"行号范围: {start_row}-{len(df)-1}")
- logger.info(f"翻译列名: {search_term_col}")
-
- # 提取并记录被翻译列的内容
- translated_column = df.iloc[start_row:][search_term_col]
- logger.info(f"被翻译列内容: {translated_column.tolist()}")
-
- except Exception as e:
- logger.error(f"记录数据详细信息时出错: {e}")
- raise
- def process_batch_translations(df: pd.DataFrame,
- search_term_col: str,
- start_row: int = 2, header_row: int = 1) -> Tuple[pd.DataFrame, pd.DataFrame]:
- """批量处理搜索词翻译"""
- try:
- # 首先提取样本数据用于检查
- sample_data = extract_sample_data(df, start_row, search_term_col, header_row=header_row)
- logger.info(f"从第{start_row}行{search_term_col}列开始的样本数据:\n{sample_data}")
-
- # 记录数据详细信息
- log_data_details(df, search_term_col, start_row, header_row)
-
- # 初始化翻译器
- translator = OpenAITranslator()
-
- # 直接提取需要翻译的搜索词
- search_terms = df.iloc[start_row:][search_term_col].tolist()
-
- # 批量翻译
- logger.info("Starting search term translations...")
-
- if os.getenv('DEBUG', '').lower() in ('true', '1', 'True'):
- # DEBUG模式:使用模拟翻译
- search_translations = [f"{text} 翻译测试" for text in search_terms]
- else:
- # 正常模式:调用真实翻译
- search_translations = translator.translate(search_terms)
-
- logger.info("Search term translations completed")
-
- # 更新数据
- translated_col = f"{search_term_col}_translated"
- df.loc[df.index[start_row:], translated_col] = search_translations
-
- return df, sample_data
- except Exception as e:
- logger.error(f"批量翻译时出错: {e}")
- raise
- def main():
- output_dir = Path('temp')
- input_file = output_dir/"测试.csv"
- output_file = output_dir/"processed_测试.csv"
-
- # 使用自定义编码检测读取CSV文件
- data = read_csv(input_file)
- df = pd.DataFrame(data[1:], columns=data[0])
-
- # 提取列数据
- extract_column_data(df, 'B', start_row=2, header_row=2) # 示例:从第3行开始提取第2列(即'B'列)的数据
-
- # 插入空列
- df = insert_empty_columns(df, ['B'], header_row=2) # 示例:在'B'列后插入空列
-
- # 处理翻译
- # df, _ = process_batch_translations(df, '搜索词')
- if __name__ == "__main__":
- main()
|