| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- import os
- import logging
- import pandas as pd
- from pathlib import Path
- from typing import List, Tuple, Union
- from mylib.pdfzh_translator import OpenAITranslator
- from mylib.read_encoding_cvs import read_csv
- from mylib.logging_config import setup_logging
- # Setup custom logging
- setup_logging()
- logger = logging.getLogger('mylib.translate_utils')
- def column_letter_to_index(col_letter: str) -> int:
- """将列字母转换为列索引(从0开始)
-
- Args:
- col_letter: 列字母(如 'A', 'B', 'AA' 等)
-
- Returns:
- 列索引(从0开始)
- """
- try:
- col_index = 0
- for i, char in enumerate(reversed(col_letter.upper())):
- col_index += (ord(char) - ord('A') + 1) * (26 ** i)
- return col_index - 1
- except Exception as e:
- logger.error(f"列字母转换时出错: {e}")
- raise
- def read_csv_with_header(file_path: str, header_row: int = 1, encoding: str = None) -> pd.DataFrame:
- """读取CSV文件并正确处理标题行
-
- Args:
- file_path: CSV文件路径
- header_row: 标题行号(从0开始),默认为1(第2行)
- encoding: 文件编码
-
- Returns:
- pandas DataFrame
- """
- try:
- if not os.path.exists(file_path):
- logger.error(f"文件不存在: {file_path}")
- raise FileNotFoundError(f"文件不存在: {file_path}")
-
- # 读取所有数据
- data = read_csv(file_path, encoding)
-
- if not data:
- logger.error("读取的文件为空")
- raise ValueError("读取的文件为空")
-
- # 确保header_row在有效范围内
- if header_row >= len(data):
- logger.error(f"标题行 {header_row} 超出文件范围")
- raise ValueError(f"标题行 {header_row} 超出文件范围")
-
- # 使用指定行作为列名,前面的行丢弃
- df = pd.DataFrame(data[header_row+1:], columns=data[header_row])
-
- logger.info(f"成功读取CSV文件,使用第{header_row+1}行作为标题行")
- logger.info(f"列标题: {df.columns.tolist()}")
- return df
- except Exception as e:
- logger.error(f"读取CSV文件时出错: {e}")
- raise
- def extract_column_data(df: pd.DataFrame, column_identifier: Union[str, int], start_row: int = 2, header_row: int = 1) -> pd.Series:
- """提取指定列的数据,默认从第3行开始
-
- Args:
- df: pandas DataFrame
- column_identifier: 要提取的列名或列号(从0开始),也可以是列字母(如 'A', 'B')
- start_row: 开始提取的行号,默认为2(第3行)
- header_row: 标题行号,默认为1(第2行)
-
- Returns:
- 包含指定列数据的Series
- """
- try:
- if df.empty:
- logger.error("DataFrame为空")
- return pd.Series()
-
- # 处理列号或列名或列字母
- if isinstance(column_identifier, str) and column_identifier.isalpha():
- column_identifier = column_letter_to_index(column_identifier)
- if isinstance(column_identifier, int):
- if column_identifier < 0 or column_identifier >= len(df.columns):
- logger.error(f"列号 {column_identifier} 超出范围")
- raise ValueError(f"列号 {column_identifier} 超出范围")
- column_identifier = df.columns[column_identifier]
-
- # 确保列名存在
- if column_identifier not in df.columns:
- logger.error(f"列名 {column_identifier} 不存在")
- raise ValueError(f"列名 {column_identifier} 不存在")
-
- # 确保开始行在有效范围内
- if start_row >= len(df) or start_row < 0:
- logger.error(f"开始行 {start_row} 超出范围")
- raise ValueError(f"开始行 {start_row} 超出范围")
-
- # 提取指定列的数据
- column_data = df.iloc[start_row:][column_identifier]
- logger.info(f"成功提取列 {column_identifier} 数据,从第{start_row}行开始,共{len(column_data)}条数据")
- return column_data
-
- except Exception as e:
- logger.error(f"提取列数据时出错: {e}")
- raise
- def test_column_extraction(input_file: str):
- """测试列提取功能
-
- Args:
- input_file: 输入CSV文件路径
- """
- try:
- if not os.path.exists(input_file):
- logger.error(f"文件不存在: {input_file}")
- raise FileNotFoundError(f"文件不存在: {input_file}")
-
- # 读取CSV文件
- df = read_csv_with_header(input_file, header_row=1)
-
- # 提取第二列的数据,从第三行开始
- column_data = extract_column_data(df, column_identifier=1, start_row=2, header_row=1)
-
- # 打印提取的数据
- print("提取的列数据:")
- print(column_data)
-
- except Exception as e:
- logger.error(f"测试列提取时出错: {e}")
- if __name__ == '__main__':
- output_dir = Path('temp')
- input_file = output_dir / "测试.csv"
- test_column_extraction('/home/mrh/code/excel_tool/temp/测试.csv')
|