import pandas as pd from typing import List, Optional, Union, Tuple import logging from mylib.logging_config import setup_logging # Setup custom logging setup_logging() logger = logging.getLogger('excel_tool') class ExcelProcessor: def __init__(self, file_path, header_row=1): """Initialize Excel processor""" self.file_path = file_path self.header_row = header_row self.df = self.read_file(self.file_path, self.header_row) def _column_letter_to_index(self, col: str) -> int: """将Excel列字母转换为索引(A=0, B=1,...)""" index = 0 for char in col.upper(): if not 'A' <= char <= 'Z': raise ValueError(f"无效的列字母: {col}") index = index * 26 + (ord(char) - ord('A') + 1) return index - 1 def _index_to_column_letter(self, index: int) -> str: """将列索引转换为Excel列字母(0=A, 1=B,...)""" if index < 0: raise ValueError("列索引不能为负数") letters = [] while index >= 0: letters.append(chr(ord('A') + (index % 26))) index = index // 26 - 1 return ''.join(reversed(letters)) def read_column_data(self, column: Union[str, int], start_row: Optional[int] = None, end_row: Optional[int] = None) -> List[str]: """ 读取指定列的数据 Args: column (Union[str, int]): 列字母(如'A')或列索引(从0开始) start_row (Optional[int]): 起始行号(从0开始),默认为表头下方第一行 end_row (Optional[int]): 结束行号(从0开始),默认为最后一行 Returns: List[str]: 读取到的数据列表 """ try: # 如果输入是列字母,转换为列索引 if isinstance(column, str) and column.isalpha(): column = self._column_letter_to_index(column) # 设置默认值 if start_row is None: start_row = self.header_row + 1 if end_row is None: end_row = len(self.df) - 1 # 读取数据 data = self.df.iloc[start_row:end_row + 1, column].tolist() logger.info(f"成功读取列 {column} 从 {start_row} 到 {end_row} 行的数据") return data except Exception as e: logger.error(f"读取列数据失败: {str(e)}") raise def write_column_data(self, column: Union[str, int], data: List[str], start_row: Optional[int] = None) -> None: """ 写入数据到指定列 Args: column (Union[str, int]): 列字母(如'A')或列索引(从0开始) data (List[str]): 要写入的数据列表 start_row (Optional[int]): 起始行号(从0开始),默认为表头下方第一行 """ try: # 如果输入是列字母,转换为列索引 if isinstance(column, str) and column.isalpha(): column = self._column_letter_to_index(column) # 设置默认值 if start_row is None: start_row = self.header_row + 1 # 检查数据长度 end_row = start_row + len(data) if end_row > len(self.df): raise ValueError("数据长度超出表格范围") # 写入数据 self.df.iloc[start_row:end_row, column] = data logger.info(f"成功写入 {len(data)} 条数据到列 {column} 从 {start_row} 行开始") except Exception as e: logger.error(f"写入列数据失败: {str(e)}") raise def search_headers(self, keywords: Union[str, List[str]]) -> List[Tuple[str, str]]: """ 从表头搜索关键词,返回匹配的列名和列字母 Args: keywords (Union[str, List[str]]): 要搜索的关键词或关键词列表 Returns: List[Tuple[str, str]]: 匹配的列名和列字母列表,格式为[(列名, 列字母), ...] """ try: # 如果输入是单个字符串,转换为列表 if isinstance(keywords, str): keywords = [keywords] # 获取所有列名 columns = self.df.columns.tolist() # 查找匹配的列 matches = [] for idx, col in enumerate(columns): if any(keyword.lower() in str(col).lower() for keyword in keywords): col_letter = self._index_to_column_letter(idx) matches.append((col, col_letter)) logger.info(f"搜索关键词 {keywords} 找到 {len(matches)} 个匹配列") return matches except Exception as e: logger.error(f"搜索表头失败: {str(e)}") raise def read_file(self, file_path: str, header_row: Optional[int] = 0) -> pd.DataFrame: """ 读取文件并返回DataFrame 支持Excel和CSV文件 Args: file_path (str): 文件路径 header_row (int, optional): 表头所在行号,从0开始计数. Defaults to 0. """ try: if file_path.endswith('.csv'): df = pd.read_csv(file_path, header=header_row) logger.info(f"成功读取CSV文件: {file_path}, 表头行: {header_row}") else: # 对于Excel文件,指定engine参数 df = pd.read_excel(file_path, engine='openpyxl', header=header_row) # 打印表头行 logger.info(f"成功读取Excel文件: {file_path}, 表头行: {header_row}") logger.info(f"表头行: {df.columns.tolist()}") return df except Exception as e: logger.error(f"读取文件失败: {file_path}, 错误: {str(e)}") raise def save_file(self, output_path: str) -> None: """ 将DataFrame保存为文件 支持Excel和CSV格式 """ df = self.df try: if output_path.endswith('.csv'): df.to_csv(output_path, index=False) logger.info(f"成功保存CSV文件: {output_path}") else: # 对于Excel文件,指定engine参数 df.to_excel(output_path, index=False, engine='openpyxl') logger.info(f"成功保存Excel文件: {output_path}") except Exception as e: logger.error(f"保存文件失败: {output_path}, 错误: {str(e)}") raise def insert_column(self, ref_column: str, new_column_name: str, position: str = 'right') -> pd.DataFrame: """ 在指定列旁边插入空列 Args: ref_column (str): 参考列名或Excel列字母(如'A', 'B') new_column_name (str): 新列名 position (str): 插入位置,'left'或'right',默认为'right' Returns: pd.DataFrame: 包含新列的DataFrame """ df = self.df try: # 如果ref_column是字母,转换为列索引 if ref_column.isalpha(): col_index = self._column_letter_to_index(ref_column) ref_column = df.columns[col_index] # 获取参考列的位置 ref_index = df.columns.get_loc(ref_column) # 计算插入位置 insert_index = ref_index + 1 if position == 'right' else ref_index # 插入新列 df.insert(insert_index, new_column_name, '') # 调整表头行数据 if self.header_row > 0: # 将新列的表头行设置为空 df.iloc[:self.header_row, insert_index] = '' # 在表头行设置新列名 df.iloc[self.header_row, insert_index] = new_column_name logger.info(f"成功在列 '{ref_column}' 的 '{position}' 插入新列 '{new_column_name}'") return df except Exception as e: logger.error(f"插入列失败: {str(e)}") raise if __name__ == '__main__': # 测试代码 processor = ExcelProcessor('/home/mrh/code/excel_tool/temp/测试.csv.utf8.csv') # 测试CSV文件 processor.insert_column('B', '翻译结果') # 测试搜索表头 matches = processor.search_headers(['类别', '搜索词']) ref_column_list = [match[1] for match in matches] print(f"找到的匹配列: {matches}") logger.info(f"找到的匹配列号: {ref_column_list}") # 测试读取列数据 data = processor.read_column_data('B') print(f"读取到的数据: {data}") # 测试写入列数据 new_data = ['新数据1', '新数据2', '新数据3'] processor.write_column_data('C', new_data) processor.save_file('/home/mrh/code/excel_tool/temp/测试_process.csv')