| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237 |
- import pandas as pd
- from typing import List, Optional, Union, Tuple
- import logging
- from mylib.logging_config import setup_logging
- # Setup custom logging
- setup_logging()
- logger = logging.getLogger('excel_tool')
- class ExcelProcessor:
- def __init__(self, file_path, header_row=1):
- """Initialize Excel processor"""
- self.file_path = file_path
- self.header_row = header_row
- self.df = self.read_file(self.file_path, self.header_row)
- def _column_letter_to_index(self, col: str) -> int:
- """将Excel列字母转换为索引(A=0, B=1,...)"""
- index = 0
- for char in col.upper():
- if not 'A' <= char <= 'Z':
- raise ValueError(f"无效的列字母: {col}")
- index = index * 26 + (ord(char) - ord('A') + 1)
- return index - 1
- def _index_to_column_letter(self, index: int) -> str:
- """将列索引转换为Excel列字母(0=A, 1=B,...)"""
- if index < 0:
- raise ValueError("列索引不能为负数")
- letters = []
- while index >= 0:
- letters.append(chr(ord('A') + (index % 26)))
- index = index // 26 - 1
- return ''.join(reversed(letters))
- def read_column_data(self, column: Union[str, int], start_row: Optional[int] = None, end_row: Optional[int] = None) -> List[str]:
- """
- 读取指定列的数据
-
- Args:
- column (Union[str, int]): 列字母(如'A')或列索引(从0开始)
- start_row (Optional[int]): 起始行号(从0开始),默认为表头下方第一行
- end_row (Optional[int]): 结束行号(从0开始),默认为最后一行
-
- Returns:
- List[str]: 读取到的数据列表
- """
- try:
- # 如果输入是列字母,转换为列索引
- if isinstance(column, str) and column.isalpha():
- column = self._column_letter_to_index(column)
-
- # 设置默认值
- if start_row is None:
- start_row = self.header_row + 1
- if end_row is None:
- end_row = len(self.df) - 1
-
- # 读取数据
- data = self.df.iloc[start_row:end_row + 1, column].tolist()
-
- logger.info(f"成功读取列 {column} 从 {start_row} 到 {end_row} 行的数据")
- return data
-
- except Exception as e:
- logger.error(f"读取列数据失败: {str(e)}")
- raise
- def write_column_data(self, column: Union[str, int], data: List[str], start_row: Optional[int] = None) -> None:
- """
- 写入数据到指定列
-
- Args:
- column (Union[str, int]): 列字母(如'A')或列索引(从0开始)
- data (List[str]): 要写入的数据列表
- start_row (Optional[int]): 起始行号(从0开始),默认为表头下方第一行
- """
- try:
- # 如果输入是列字母,转换为列索引
- if isinstance(column, str) and column.isalpha():
- column = self._column_letter_to_index(column)
-
- # 设置默认值
- if start_row is None:
- start_row = self.header_row + 1
-
- # 检查数据长度
- end_row = start_row + len(data)
- if end_row > len(self.df):
- raise ValueError("数据长度超出表格范围")
-
- # 写入数据
- self.df.iloc[start_row:end_row, column] = data
-
- logger.info(f"成功写入 {len(data)} 条数据到列 {column} 从 {start_row} 行开始")
-
- except Exception as e:
- logger.error(f"写入列数据失败: {str(e)}")
- raise
- def search_headers(self, keywords: Union[str, List[str]]) -> List[Tuple[str, str]]:
- """
- 从表头搜索关键词,返回匹配的列名和列字母
-
- Args:
- keywords (Union[str, List[str]]): 要搜索的关键词或关键词列表
-
- Returns:
- List[Tuple[str, str]]: 匹配的列名和列字母列表,格式为[(列名, 列字母), ...]
- """
- try:
- # 如果输入是单个字符串,转换为列表
- if isinstance(keywords, str):
- keywords = [keywords]
-
- # 获取所有列名
- columns = self.df.columns.tolist()
-
- # 查找匹配的列
- matches = []
- for idx, col in enumerate(columns):
- if any(keyword.lower() in str(col).lower() for keyword in keywords):
- col_letter = self._index_to_column_letter(idx)
- matches.append((col, col_letter))
-
- logger.info(f"搜索关键词 {keywords} 找到 {len(matches)} 个匹配列")
- return matches
-
- except Exception as e:
- logger.error(f"搜索表头失败: {str(e)}")
- raise
- def read_file(self, file_path: str, header_row: Optional[int] = 0) -> pd.DataFrame:
- """
- 读取文件并返回DataFrame
- 支持Excel和CSV文件
-
- Args:
- file_path (str): 文件路径
- header_row (int, optional): 表头所在行号,从0开始计数. Defaults to 0.
- """
- try:
- if file_path.endswith('.csv'):
- df = pd.read_csv(file_path, header=header_row)
- logger.info(f"成功读取CSV文件: {file_path}, 表头行: {header_row}")
- else:
- # 对于Excel文件,指定engine参数
- df = pd.read_excel(file_path, engine='openpyxl', header=header_row)
- # 打印表头行
- logger.info(f"成功读取Excel文件: {file_path}, 表头行: {header_row}")
- logger.info(f"表头行: {df.columns.tolist()}")
- return df
- except Exception as e:
- logger.error(f"读取文件失败: {file_path}, 错误: {str(e)}")
- raise
- def save_file(self, output_path: str) -> None:
- """
- 将DataFrame保存为文件
- 支持Excel和CSV格式
- """
- df = self.df
- try:
- if output_path.endswith('.csv'):
- df.to_csv(output_path, index=False)
- logger.info(f"成功保存CSV文件: {output_path}")
- else:
- # 对于Excel文件,指定engine参数
- df.to_excel(output_path, index=False, engine='openpyxl')
- logger.info(f"成功保存Excel文件: {output_path}")
- except Exception as e:
- logger.error(f"保存文件失败: {output_path}, 错误: {str(e)}")
- raise
- def insert_column(self, ref_column: str, new_column_name: str, position: str = 'right') -> pd.DataFrame:
- """
- 在指定列旁边插入空列
-
- Args:
- ref_column (str): 参考列名或Excel列字母(如'A', 'B')
- new_column_name (str): 新列名
- position (str): 插入位置,'left'或'right',默认为'right'
-
- Returns:
- pd.DataFrame: 包含新列的DataFrame
- """
- df = self.df
- try:
- # 如果ref_column是字母,转换为列索引
- if ref_column.isalpha():
- col_index = self._column_letter_to_index(ref_column)
- ref_column = df.columns[col_index]
-
- # 获取参考列的位置
- ref_index = df.columns.get_loc(ref_column)
-
- # 计算插入位置
- insert_index = ref_index + 1 if position == 'right' else ref_index
-
- # 插入新列
- df.insert(insert_index, new_column_name, '')
-
- # 调整表头行数据
- if self.header_row > 0:
- # 将新列的表头行设置为空
- df.iloc[:self.header_row, insert_index] = ''
- # 在表头行设置新列名
- df.iloc[self.header_row - 1, insert_index] = new_column_name
-
- logger.info(f"成功在列 '{ref_column}' 的 '{position}' 插入新列 '{new_column_name}'")
- return df
-
- except Exception as e:
- logger.error(f"插入列失败: {str(e)}")
- raise
- if __name__ == '__main__':
- # 测试代码
- processor = ExcelProcessor('/home/mrh/code/excel_tool/temp/测试.csv.utf8.csv')
- # 测试CSV文件
- processor.insert_column('B', '翻译结果')
-
- # 测试搜索表头
- matches = processor.search_headers(['类别', '搜索词'])
- ref_column_list = [match[1] for match in matches]
- print(f"找到的匹配列: {matches}")
- logger.info(f"找到的匹配列号: {ref_column_list}")
-
- # 测试读取列数据
- data = processor.read_column_data('B')
- print(f"读取到的数据: {data}")
-
- # 测试写入列数据
- new_data = ['新数据1', '新数据2', '新数据3']
- processor.write_column_data('C', new_data)
-
- processor.save_file('/home/mrh/code/excel_tool/temp/测试_process.csv')
|