import os import logging import pandas as pd from pathlib import Path from typing import List, Tuple, Union from mylib.pdfzh_translator import OpenAITranslator from mylib.read_encoding_cvs import read_csv from mylib.logging_config import setup_logging # Setup custom logging setup_logging() logger = logging.getLogger('mylib.translate_utils') def column_letter_to_index(col_letter: str) -> int: """将列字母转换为列索引(从0开始) Args: col_letter: 列字母(如 'A', 'B', 'AA' 等) Returns: 列索引(从0开始) """ try: col_index = 0 for i, char in enumerate(reversed(col_letter.upper())): col_index += (ord(char) - ord('A') + 1) * (26 ** i) return col_index - 1 except Exception as e: logger.error(f"列字母转换时出错: {e}") raise def extract_column_data(df: pd.DataFrame, column_identifier: Union[str, int], start_row: int = 2, header_row: int = 1) -> pd.Series: """提取指定列的数据,默认从第3行开始 Args: df: pandas DataFrame column_identifier: 要提取的列名或列号(从0开始),也可以是列字母(如 'A', 'B') start_row: 开始提取的行号,默认为2(第3行) header_row: 标题行号,默认为1(第2行) Returns: 包含指定列数据的Series """ try: if df.empty: return pd.Series() # 处理列号或列名或列字母 if isinstance(column_identifier, str) and column_identifier.isalpha(): column_identifier = column_letter_to_index(column_identifier) if isinstance(column_identifier, int): if column_identifier < 0 or column_identifier >= len(df.columns): raise ValueError(f"列号 {column_identifier} 超出范围") column_identifier = df.columns[column_identifier] # 确保列名存在 if column_identifier not in df.columns: raise ValueError(f"列名 {column_identifier} 不存在") # 确保开始行在有效范围内 if start_row >= len(df) or start_row < 0: raise ValueError(f"开始行 {start_row} 超出范围") # 提取指定列的数据 column_data = df.iloc[start_row:][column_identifier] logger.info(f"成功提取列 {column_identifier} 数据,从第{start_row}行开始,共{len(column_data)}条数据") logger.info(f"列 {column_identifier} 数据: {column_data.tolist()}") return column_data except Exception as e: logger.error(f"提取列数据时出错: {e}") raise def insert_empty_columns(df: pd.DataFrame, column_names: List[Union[str, int]], header_row: int = 1) -> pd.DataFrame: """在指定列之后插入空列""" try: # 按从大到小排序,防止插入影响后续索引 column_names = sorted(column_names, reverse=True, key=lambda x: df.columns.get_loc(x) if isinstance(x, str) else x) for col in column_names: if isinstance(col, str) and col.isalpha(): col = column_letter_to_index(col) if isinstance(col, int): if col < 0 or col >= len(df.columns): raise ValueError(f"列号 {col} 超出范围") col = df.columns[col] if col in df.columns: # 在指定列后插入空列 new_col_index = df.columns.get_loc(col) + 1 new_col_name = f"{col}_translated" df.insert(new_col_index, new_col_name, '') return df except Exception as e: logger.error(f"插入空列时出错: {e}") raise def extract_sample_data(df: pd.DataFrame, start_row: int = 0, column_name: str = None, n: int = 3, header_row: int = 1) -> pd.DataFrame: """提取指定行和列开始的样本数据""" try: # 确保不超过数据范围 end_row = min(start_row + n, len(df)) if column_name: return df.iloc[start_row:end_row][[column_name]] return df.iloc[start_row:end_row] except Exception as e: logger.error(f"提取样本数据时出错: {e}") raise def log_data_details(df: pd.DataFrame, search_term_col: str, start_row: int = 2, header_row: int = 1): """记录数据详细信息""" try: # 记录行号和列号 logger.info(f"行号范围: {start_row}-{len(df)-1}") logger.info(f"翻译列名: {search_term_col}") # 提取并记录被翻译列的内容 translated_column = df.iloc[start_row:][search_term_col] logger.info(f"被翻译列内容: {translated_column.tolist()}") except Exception as e: logger.error(f"记录数据详细信息时出错: {e}") raise def process_batch_translations(df: pd.DataFrame, search_term_col: str, start_row: int = 2, header_row: int = 1) -> Tuple[pd.DataFrame, pd.DataFrame]: """批量处理搜索词翻译""" try: # 首先提取样本数据用于检查 sample_data = extract_sample_data(df, start_row, search_term_col, header_row=header_row) logger.info(f"从第{start_row}行{search_term_col}列开始的样本数据:\n{sample_data}") # 记录数据详细信息 log_data_details(df, search_term_col, start_row, header_row) # 初始化翻译器 translator = OpenAITranslator() # 直接提取需要翻译的搜索词 search_terms = df.iloc[start_row:][search_term_col].tolist() # 批量翻译 logger.info("Starting search term translations...") if os.getenv('DEBUG', '').lower() in ('true', '1', 'True'): # DEBUG模式:使用模拟翻译 search_translations = [f"{text} 翻译测试" for text in search_terms] else: # 正常模式:调用真实翻译 search_translations = translator.translate(search_terms) logger.info("Search term translations completed") # 更新数据 translated_col = f"{search_term_col}_translated" df.loc[df.index[start_row:], translated_col] = search_translations return df, sample_data except Exception as e: logger.error(f"批量翻译时出错: {e}") raise def read_csv_with_header(file_path: str, header_row: int = 1, encoding: str = None) -> pd.DataFrame: """读取CSV文件并正确处理标题行 Args: file_path: CSV文件路径 header_row: 标题行号(从0开始),默认为1(第2行) encoding: 文件编码 Returns: pandas DataFrame """ try: # 读取所有数据 data = read_csv(file_path, encoding) # 确保header_row在有效范围内 if header_row >= len(data): raise ValueError(f"标题行 {header_row} 超出文件范围") # 使用指定行作为列名,前面的行丢弃 df = pd.DataFrame(data[header_row+1:], columns=data[header_row]) logger.info(f"成功读取CSV文件,使用第{header_row+1}行作为标题行") return df except Exception as e: logger.error(f"读取CSV文件时出错: {e}") raise def main(): output_dir = Path('temp') input_file = output_dir/"测试.csv" output_file = output_dir/"processed_测试.csv" # 使用自定义编码检测读取CSV文件 df = read_csv_with_header(input_file, header_row=1) # 使用第2行作为标题行 # 提取列数据 extract_column_data(df, 'B', start_row=2, header_row=1) # 示例:从第3行开始提取第2列(即'B'列)的数据 # 插入空列 df = insert_empty_columns(df, ['B'], header_row=1) # 示例:在'B'列后插入空列 # 处理翻译 # df, _ = process_batch_translations(df, '搜索词') if __name__ == "__main__": main()