|
|
@@ -33,6 +33,71 @@ class ExcelProcessor:
|
|
|
index = index // 26 - 1
|
|
|
return ''.join(reversed(letters))
|
|
|
|
|
|
+ def read_column_data(self, column: Union[str, int], start_row: Optional[int] = None, end_row: Optional[int] = None) -> List[str]:
|
|
|
+ """
|
|
|
+ 读取指定列的数据
|
|
|
+
|
|
|
+ Args:
|
|
|
+ column (Union[str, int]): 列字母(如'A')或列索引(从0开始)
|
|
|
+ start_row (Optional[int]): 起始行号(从0开始),默认为表头下方第一行
|
|
|
+ end_row (Optional[int]): 结束行号(从0开始),默认为最后一行
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ List[str]: 读取到的数据列表
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 如果输入是列字母,转换为列索引
|
|
|
+ if isinstance(column, str) and column.isalpha():
|
|
|
+ column = self._column_letter_to_index(column)
|
|
|
+
|
|
|
+ # 设置默认值
|
|
|
+ if start_row is None:
|
|
|
+ start_row = self.header_row + 1
|
|
|
+ if end_row is None:
|
|
|
+ end_row = len(self.df) - 1
|
|
|
+
|
|
|
+ # 读取数据
|
|
|
+ data = self.df.iloc[start_row:end_row + 1, column].tolist()
|
|
|
+
|
|
|
+ logger.info(f"成功读取列 {column} 从 {start_row} 到 {end_row} 行的数据")
|
|
|
+ return data
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"读取列数据失败: {str(e)}")
|
|
|
+ raise
|
|
|
+
|
|
|
+ def write_column_data(self, column: Union[str, int], data: List[str], start_row: Optional[int] = None) -> None:
|
|
|
+ """
|
|
|
+ 写入数据到指定列
|
|
|
+
|
|
|
+ Args:
|
|
|
+ column (Union[str, int]): 列字母(如'A')或列索引(从0开始)
|
|
|
+ data (List[str]): 要写入的数据列表
|
|
|
+ start_row (Optional[int]): 起始行号(从0开始),默认为表头下方第一行
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 如果输入是列字母,转换为列索引
|
|
|
+ if isinstance(column, str) and column.isalpha():
|
|
|
+ column = self._column_letter_to_index(column)
|
|
|
+
|
|
|
+ # 设置默认值
|
|
|
+ if start_row is None:
|
|
|
+ start_row = self.header_row + 1
|
|
|
+
|
|
|
+ # 检查数据长度
|
|
|
+ end_row = start_row + len(data)
|
|
|
+ if end_row > len(self.df):
|
|
|
+ raise ValueError("数据长度超出表格范围")
|
|
|
+
|
|
|
+ # 写入数据
|
|
|
+ self.df.iloc[start_row:end_row, column] = data
|
|
|
+
|
|
|
+ logger.info(f"成功写入 {len(data)} 条数据到列 {column} 从 {start_row} 行开始")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"写入列数据失败: {str(e)}")
|
|
|
+ raise
|
|
|
+
|
|
|
def search_headers(self, keywords: Union[str, List[str]]) -> List[Tuple[str, str]]:
|
|
|
"""
|
|
|
从表头搜索关键词,返回匹配的列名和列字母
|
|
|
@@ -159,4 +224,12 @@ if __name__ == '__main__':
|
|
|
print(f"找到的匹配列: {matches}")
|
|
|
logger.info(f"找到的匹配列号: {ref_column_list}")
|
|
|
|
|
|
+ # 测试读取列数据
|
|
|
+ data = processor.read_column_data('B')
|
|
|
+ print(f"读取到的数据: {data}")
|
|
|
+
|
|
|
+ # 测试写入列数据
|
|
|
+ new_data = ['新数据1', '新数据2', '新数据3']
|
|
|
+ processor.write_column_data('B', new_data)
|
|
|
+
|
|
|
# processor.save_file(df, '/home/mrh/code/excel_tool/temp/测试_process.csv')
|