excel_processor.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. import pandas as pd
  2. from typing import List, Optional, Union, Tuple
  3. import logging
  4. from mylib.logging_config import setup_logging
  5. # Setup custom logging
  6. setup_logging()
  7. logger = logging.getLogger('excel_tool')
  8. class ExcelProcessor:
  9. def __init__(self, file_path, header_row=1):
  10. """Initialize Excel processor"""
  11. self.file_path = file_path
  12. self.header_row = header_row
  13. self.df = self.read_file(self.file_path, self.header_row)
  14. def _column_letter_to_index(self, col: str) -> int:
  15. """将Excel列字母转换为索引(A=0, B=1,...)"""
  16. index = 0
  17. for char in col.upper():
  18. if not 'A' <= char <= 'Z':
  19. raise ValueError(f"无效的列字母: {col}")
  20. index = index * 26 + (ord(char) - ord('A') + 1)
  21. return index - 1
  22. def _index_to_column_letter(self, index: int) -> str:
  23. """将列索引转换为Excel列字母(0=A, 1=B,...)"""
  24. if index < 0:
  25. raise ValueError("列索引不能为负数")
  26. letters = []
  27. while index >= 0:
  28. letters.append(chr(ord('A') + (index % 26)))
  29. index = index // 26 - 1
  30. return ''.join(reversed(letters))
  31. def read_column_data(self, column: Union[str, int], start_row: Optional[int] = None, end_row: Optional[int] = None) -> List[str]:
  32. """
  33. 读取指定列的数据
  34. Args:
  35. column (Union[str, int]): 列字母(如'A')或列索引(从0开始)
  36. start_row (Optional[int]): 起始行号(从0开始),默认为表头下方第一行
  37. end_row (Optional[int]): 结束行号(从0开始),默认为最后一行
  38. Returns:
  39. List[str]: 读取到的数据列表
  40. """
  41. try:
  42. # 如果输入是列字母,转换为列索引
  43. if isinstance(column, str) and column.isalpha():
  44. column = self._column_letter_to_index(column)
  45. # 设置默认值
  46. if start_row is None:
  47. start_row = self.header_row + 1
  48. if end_row is None:
  49. end_row = len(self.df) - 1
  50. # 读取数据
  51. data = self.df.iloc[start_row:end_row + 1, column].tolist()
  52. logger.info(f"成功读取列 {column} 从 {start_row} 到 {end_row} 行的数据")
  53. return data
  54. except Exception as e:
  55. logger.error(f"读取列数据失败: {str(e)}")
  56. raise
  57. def write_column_data(self, column: Union[str, int], data: List[str], start_row: Optional[int] = None) -> None:
  58. """
  59. 写入数据到指定列
  60. Args:
  61. column (Union[str, int]): 列字母(如'A')或列索引(从0开始)
  62. data (List[str]): 要写入的数据列表
  63. start_row (Optional[int]): 起始行号(从0开始),默认为表头下方第一行
  64. """
  65. try:
  66. # 如果输入是列字母,转换为列索引
  67. if isinstance(column, str) and column.isalpha():
  68. column = self._column_letter_to_index(column)
  69. # 设置默认值
  70. if start_row is None:
  71. start_row = self.header_row + 1
  72. # 检查数据长度
  73. end_row = start_row + len(data)
  74. if end_row > len(self.df):
  75. raise ValueError("数据长度超出表格范围")
  76. # 写入数据
  77. self.df.iloc[start_row:end_row, column] = data
  78. logger.info(f"成功写入 {len(data)} 条数据到列 {column} 从 {start_row} 行开始")
  79. except Exception as e:
  80. logger.error(f"写入列数据失败: {str(e)}")
  81. raise
  82. def search_headers(self, keywords: Union[str, List[str]]) -> List[Tuple[str, str]]:
  83. """
  84. 从表头搜索关键词,返回匹配的列名和列字母
  85. Args:
  86. keywords (Union[str, List[str]]): 要搜索的关键词或关键词列表
  87. Returns:
  88. List[Tuple[str, str]]: 匹配的列名和列字母列表,格式为[(列名, 列字母), ...]
  89. """
  90. try:
  91. # 如果输入是单个字符串,转换为列表
  92. if isinstance(keywords, str):
  93. keywords = [keywords]
  94. # 获取所有列名
  95. columns = self.df.columns.tolist()
  96. # 查找匹配的列
  97. matches = []
  98. for idx, col in enumerate(columns):
  99. if any(keyword.lower() in str(col).lower() for keyword in keywords):
  100. col_letter = self._index_to_column_letter(idx)
  101. matches.append((col, col_letter))
  102. logger.info(f"搜索关键词 {keywords} 找到 {len(matches)} 个匹配列")
  103. return matches
  104. except Exception as e:
  105. logger.error(f"搜索表头失败: {str(e)}")
  106. raise
  107. def read_file(self, file_path: str, header_row: Optional[int] = 0) -> pd.DataFrame:
  108. """
  109. 读取文件并返回DataFrame
  110. 支持Excel和CSV文件
  111. Args:
  112. file_path (str): 文件路径
  113. header_row (int, optional): 表头所在行号,从0开始计数. Defaults to 0.
  114. """
  115. try:
  116. if file_path.endswith('.csv'):
  117. df = pd.read_csv(file_path, header=header_row)
  118. logger.info(f"成功读取CSV文件: {file_path}, 表头行: {header_row}")
  119. else:
  120. # 对于Excel文件,指定engine参数
  121. df = pd.read_excel(file_path, engine='openpyxl', header=header_row)
  122. # 打印表头行
  123. logger.info(f"成功读取Excel文件: {file_path}, 表头行: {header_row}")
  124. logger.info(f"表头行: {df.columns.tolist()}")
  125. return df
  126. except Exception as e:
  127. logger.error(f"读取文件失败: {file_path}, 错误: {str(e)}")
  128. raise
  129. def save_file(self, output_path: str) -> None:
  130. """
  131. 将DataFrame保存为文件
  132. 支持Excel和CSV格式
  133. """
  134. df = self.df
  135. try:
  136. if output_path.endswith('.csv'):
  137. df.to_csv(output_path, index=False)
  138. logger.info(f"成功保存CSV文件: {output_path}")
  139. else:
  140. # 对于Excel文件,指定engine参数
  141. df.to_excel(output_path, index=False, engine='openpyxl')
  142. logger.info(f"成功保存Excel文件: {output_path}")
  143. except Exception as e:
  144. logger.error(f"保存文件失败: {output_path}, 错误: {str(e)}")
  145. raise
  146. def insert_column(self, ref_column: str, new_column_name: str, position: str = 'right') -> pd.DataFrame:
  147. """
  148. 在指定列旁边插入空列
  149. Args:
  150. ref_column (str): 参考列名或Excel列字母(如'A', 'B')
  151. new_column_name (str): 新列名
  152. position (str): 插入位置,'left'或'right',默认为'right'
  153. Returns:
  154. pd.DataFrame: 包含新列的DataFrame
  155. """
  156. df = self.df
  157. try:
  158. # 如果ref_column是字母,转换为列索引
  159. if ref_column.isalpha():
  160. col_index = self._column_letter_to_index(ref_column)
  161. ref_column = df.columns[col_index]
  162. # 获取参考列的位置
  163. ref_index = df.columns.get_loc(ref_column)
  164. # 计算插入位置
  165. insert_index = ref_index + 1 if position == 'right' else ref_index
  166. # 插入新列
  167. df.insert(insert_index, new_column_name, '')
  168. # 调整表头行数据
  169. if self.header_row > 0:
  170. # 将新列的表头行设置为空
  171. df.iloc[:self.header_row, insert_index] = ''
  172. # 在表头行设置新列名
  173. df.iloc[self.header_row - 1, insert_index] = new_column_name
  174. logger.info(f"成功在列 '{ref_column}' 的 '{position}' 插入新列 '{new_column_name}'")
  175. return df
  176. except Exception as e:
  177. logger.error(f"插入列失败: {str(e)}")
  178. raise
  179. if __name__ == '__main__':
  180. # 测试代码
  181. processor = ExcelProcessor('/home/mrh/code/excel_tool/temp/测试.csv.utf8.csv')
  182. # 测试CSV文件
  183. processor.insert_column('B', '翻译结果')
  184. # 测试搜索表头
  185. matches = processor.search_headers(['类别', '搜索词'])
  186. ref_column_list = [match[1] for match in matches]
  187. print(f"找到的匹配列: {matches}")
  188. logger.info(f"找到的匹配列号: {ref_column_list}")
  189. # 测试读取列数据
  190. data = processor.read_column_data('B')
  191. print(f"读取到的数据: {data}")
  192. # 测试写入列数据
  193. new_data = ['新数据1', '新数据2', '新数据3']
  194. processor.write_column_data('C', new_data)
  195. processor.save_file('/home/mrh/code/excel_tool/temp/测试_process.csv')