| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336 |
- import pandas as pd
- from typing import List, Optional, Union, Tuple
- import logging
- from dotenv import load_dotenv
- from mylib.pdfzh_translator import OpenAITranslator
- load_dotenv()
- from mylib.logging_config import setup_logging
- # Setup custom logging
- setup_logging()
- logger = logging.getLogger('excel_tool')
- class ExcelProcessor:
- def __init__(self, file_path, header_row=1):
- """Initialize Excel processor"""
- self.file_path = file_path
- self.header_row = header_row
- self.df = self.read_file(self.file_path, self.header_row)
- self.translator = OpenAITranslator()
- def _column_letter_to_index(self, col: str) -> int:
- """将Excel列字母转换为索引(A=0, B=1,...)"""
- index = 0
- for char in col.upper():
- if not 'A' <= char <= 'Z':
- raise ValueError(f"无效的列字母: {col}")
- index = index * 26 + (ord(char) - ord('A') + 1)
- return index - 1
- def _column_index_to_letter(self, idx: int) -> str:
- """将Excel列索引转换为字母(0=A, 1=B,...)"""
- if idx < 0:
- raise ValueError("列索引不能为负数")
- letters = []
- while idx >= 0:
- letters.append(chr(ord('A') + (idx % 26)))
- idx = (idx // 26) - 1
- return ''.join(reversed(letters))
- def read_column_data(self, column: Union[str, int], start_row: Optional[int] = None, end_row: Optional[int] = None) -> List[str]:
- """
- 读取指定列的数据,不含表头
-
- Args:
- column (Union[str, int]): 列字母(如'A')或列索引(从0开始)
- start_row (Optional[int]): 起始行号(从0开始),默认为表头下方第一行
- end_row (Optional[int]): 结束行号(从0开始),默认为最后一行
-
- Returns:
- List[str]: 读取到的数据列表
- """
- try:
- # 如果输入是列字母,转换为列索引
- if isinstance(column, str) and column.isalpha():
- column = self._column_letter_to_index(column)
-
- # 设置默认值
- if start_row is None:
- start_row = 0
- if end_row is None:
- end_row = len(self.df) - 1
-
- # 读取数据
- data = self.df.iloc[start_row:end_row + 1, column].tolist()
-
- logger.info(f"成功读取列 {column} ,行 {start_row} : {end_row} 的数据")
- return data
-
- except Exception as e:
- logger.error(f"读取列数据失败: {str(e)}")
- raise
- def write_column_data(self, column: Union[str, int], data: List[str], start_row: Optional[int] = None) -> None:
- """
- 写入数据到指定列
-
- Args:
- column (Union[str, int]): 列字母(如'A')或列索引(从0开始)
- data (List[str]): 要写入的数据列表
- start_row (Optional[int]): 起始行号(从0开始),默认为表头下方第一行
- """
- try:
- # 如果输入是列字母,转换为列索引
- if isinstance(column, str) and column.isalpha():
- column = self._column_letter_to_index(column)
-
- # 设置默认值
- if start_row is None:
- start_row = 0
-
- # 检查数据长度
- end_row = start_row + len(data)
- if end_row > len(self.df):
- raise ValueError("数据长度超出表格范围")
-
- # 写入数据
- self.df.iloc[start_row:end_row, column] = data
-
- logger.info(f"成功写入 {len(data)} 条数据到列 {column} 从 {start_row} 行开始")
-
- except Exception as e:
- logger.error(f"写入列数据失败: {str(e)}")
- raise
- def search_headers(self, keywords: Union[str, List[str]]) -> List[Tuple[str, int]]:
- """
- 从表头搜索关键词,返回匹配的列名和列索引
-
- Args:
- keywords (Union[str, List[str]]): 要搜索的关键词或关键词列表
-
- Returns:
- List[Tuple[str, int]]: 匹配的列名和列索引列表,格式为[(列名, 列索引), ...]
- """
- try:
- # 如果输入是单个字符串,转换为列表
- if isinstance(keywords, str):
- keywords = [keywords]
-
- # 获取所有列名
- columns = self.df.columns.tolist()
-
- # 查找匹配的列
- matches = []
- for idx, col in enumerate(columns):
- if any(keyword.lower() in str(col).lower() for keyword in keywords):
- matches.append((col, idx))
-
- logger.info(f"搜索关键词 {keywords} 找到 {len(matches)} 个匹配列 {matches}")
- return matches
-
- except Exception as e:
- logger.error(f"搜索表头失败: {str(e)}")
- raise
- def read_file(self, file_path: str, header_row: Optional[int] = 0) -> pd.DataFrame:
- """
- 读取文件并返回DataFrame
- 支持Excel和CSV文件
-
- Args:
- file_path (str): 文件路径
- header_row (int, optional): 表头所在行号,从0开始计数. Defaults to 0.
- """
- try:
- if file_path.endswith('.csv'):
- df = pd.read_csv(file_path, header=header_row)
- logger.info(f"成功读取CSV文件: {file_path}, 表头行: {header_row}")
- else:
- # 对于Excel文件,指定engine参数
- df = pd.read_excel(file_path, engine='openpyxl', header=header_row)
- # 打印表头行
- logger.info(f"成功读取Excel文件: {file_path}, 表头行: {header_row}")
- logger.info(f"表头行: {df.columns.tolist()}")
- return df
- except Exception as e:
- logger.error(f"读取文件失败: {file_path}, 错误: {str(e)}")
- raise
- def save_file(self, output_path: str) -> None:
- """
- 将DataFrame保存为文件
- 支持Excel和CSV格式
- """
- df = self.df
- try:
- if output_path.endswith('.csv'):
- df.to_csv(output_path, index=False)
- logger.info(f"成功保存CSV文件: {output_path}")
- else:
- # 对于Excel文件,指定engine参数
- df.to_excel(output_path, index=False, engine='openpyxl')
- logger.info(f"成功保存Excel文件: {output_path}")
- except Exception as e:
- logger.error(f"保存文件失败: {output_path}, 错误: {str(e)}")
- raise
- def _pad_values_to_match_index(self, values: list, length: int) -> list:
- """
- 如果values长度不足,用空字符串填充到指定长度
-
- Args:
- values (list): 原始值列表
- length (int): 目标长度
-
- Returns:
- list: 填充后的列表
- """
- if len(values) < length:
- return values + [''] * (length - len(values))
- return values
- def insert_column_with_header(self, column: Union[str, int], header: str) -> int:
- """
- 在指定列之后插入一个空列,并在新列的第一行写入表头
- 如果列名已存在,则返回现有列的索引
- Args:
- column (Union[str, int]): 要插入空列的列字母或列索引(从0开始)
- header (str): 新列的表头
- Returns:
- int: 新列或现有列的索引
- """
- try:
- # 如果输入是列字母,转换为列索引
- if isinstance(column, str) and column.isalpha():
- column_index = self._column_letter_to_index(column)
- else:
- column_index = int(column)
- # 检查列名是否已存在
- if header in self.df.columns:
- logger.info(f"列 {header} 已存在,返回现有列索引")
- return list(self.df.columns).index(header)
- # 插入空列
- self.df.insert(column_index + 1, header, '')
- logger.info(f"成功在列 {column} 后插入空列,并在第一行写入表头 {header}")
- return column_index + 1
- except Exception as e:
- logger.error(f"插入列失败: {str(e)}")
- raise
- def set_cell_value(self, column: str, row: int, value: str) -> None:
- """
- 设置指定单元格的值
- Args:
- column (str): 列字母
- row (int): 行号(从0开始)
- value (str): 要设置的值
- """
- try:
- # 将列字母转换为列索引
- column_index = self._column_letter_to_index(column)
- # 设置单元格的值
- self.df.at[row, column_index] = value
- logger.info(f"成功设置单元格 {column}{row} 的值为 {value}")
- except Exception as e:
- logger.error(f"设置单元格值失败: {str(e)}")
- raise
- def translate_column(self, column: Union[str, int]) -> int:
- """
- 翻译指定列的内容,并在该列右侧插入翻译结果列
-
- Args:
- column (Union[str, int]): 要翻译的列字母或列索引(从0开始)
-
- Returns:
- int: 新插入的翻译结果列的索引
- """
- try:
- # 在指定列右侧插入新列
- new_col_index = self.insert_column_with_header(column, '翻译结果')
-
- # 读取原列数据
- data = self.read_column_data(column)
- logger.info(f"读取到的数据: {data}")
-
- # 批量翻译
- translated_texts = self.translator._batch_translate(data)
-
- # 写入翻译结果
- self.write_column_data(new_col_index, translated_texts)
-
- # 返回新列索引
- return new_col_index
-
- except Exception as e:
- logger.error(f"翻译列失败: {str(e)}")
- raise
- def translate_columns_by_keywords(self, keywords: List[str]) -> None:
- """
- 根据关键词搜索表头,批量翻译匹配的列
-
- Args:
- keywords (List[str]): 要搜索的关键词列表
- """
- try:
- # 搜索匹配的列
- matches = self.search_headers(keywords)
- ref_column_indices = [match[1] for match in matches]
- logger.info(f"找到的匹配列索引: {ref_column_indices}")
- adjusted_col_idx = 0
- # 从左到右依次翻译
- for title, col_idx in enumerate(ref_column_indices):
- col_idx += adjusted_col_idx
- logger.info(f"正在翻译第 {col_idx} 列")
-
- # 翻译当前列
- new_col_idx = self.translate_column(adjusted_col_idx)
- # 写入后的数据
- new_data = self.read_column_data(new_col_idx)
- logger.info(f"写入 {new_col_idx} 列后的数据: {new_data}")
-
- # 由于前面插入新列会影响后续列的索引,需要调整
- adjusted_col_idx += 1
-
- except Exception as e:
- logger.error(f"批量翻译列失败: {str(e)}")
- raise
-
-
- if __name__ == '__main__':
- # 测试代码
- processor = ExcelProcessor('/home/mrh/code/excel_tool/temp/测试.csv.utf8.csv')
- processor.translate_columns_by_keywords(['类别', '搜索词'])
- # matches = processor.search_headers(['类别', '搜索词'])
- # ref_column_indices = [match[1] for match in matches]
- # logger.info(f"找到的匹配列索引: {ref_column_indices}")
- # process_col = ref_column_indices[0]
- # new_col_index = processor.translate_column(process_col)
- # new_data = processor.read_column_data(new_col_index)
- # print(f"写入后的数据: {new_data}")
-
-
-
- # 获得某一列的所有数据
- # processor.set_cell_value('C', 0, '测试')
- # 测试搜索表头
- # matches = processor.search_headers(['类别', '搜索词'])
- # logger.info(f"找到的匹配列号: {ref_column_list}")
-
- # # 测试读取列数据
- # data = processor.read_column_data('B')
- # print(f"读取到的数据: {data}")
-
- # # 测试写入列数据
- # new_data = ['新数据1', '新数据2', '新数据3']
- # processor.write_column_data('C', new_data)
-
- processor.save_file('/home/mrh/code/excel_tool/temp/测试_process.csv')
|