new_col_translate.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. import os
  2. import logging
  3. import pandas as pd
  4. from pathlib import Path
  5. from typing import List, Tuple, Union
  6. from mylib.pdfzh_translator import OpenAITranslator
  7. from mylib.read_encoding_cvs import read_csv
  8. from mylib.logging_config import setup_logging
  9. # Setup custom logging
  10. setup_logging()
  11. logger = logging.getLogger('mylib.translate_utils')
  12. def column_letter_to_index(col_letter: str) -> int:
  13. """将列字母转换为列索引(从0开始)
  14. Args:
  15. col_letter: 列字母(如 'A', 'B', 'AA' 等)
  16. Returns:
  17. 列索引(从0开始)
  18. """
  19. try:
  20. col_index = 0
  21. for i, char in enumerate(reversed(col_letter.upper())):
  22. col_index += (ord(char) - ord('A') + 1) * (26 ** i)
  23. return col_index - 1
  24. except Exception as e:
  25. logger.error(f"列字母转换时出错: {e}")
  26. raise
  27. def read_csv_with_header(file_path: str, header_row: int = 1, encoding: str = None) -> pd.DataFrame:
  28. """读取CSV文件并正确处理标题行
  29. Args:
  30. file_path: CSV文件路径
  31. header_row: 标题行号(从0开始),默认为1(第2行)
  32. encoding: 文件编码
  33. Returns:
  34. pandas DataFrame
  35. """
  36. try:
  37. if not os.path.exists(file_path):
  38. logger.error(f"文件不存在: {file_path}")
  39. raise FileNotFoundError(f"文件不存在: {file_path}")
  40. # 读取所有数据
  41. data = read_csv(file_path, encoding)
  42. if not data:
  43. logger.error("读取的文件为空")
  44. raise ValueError("读取的文件为空")
  45. # 确保header_row在有效范围内
  46. if header_row >= len(data):
  47. logger.error(f"标题行 {header_row} 超出文件范围")
  48. raise ValueError(f"标题行 {header_row} 超出文件范围")
  49. # 使用指定行作为列名,前面的行丢弃
  50. df = pd.DataFrame(data[header_row+1:], columns=data[header_row])
  51. logger.info(f"成功读取CSV文件,使用第{header_row+1}行作为标题行")
  52. logger.info(f"列标题: {df.columns.tolist()}")
  53. return df
  54. except Exception as e:
  55. logger.error(f"读取CSV文件时出错: {e}")
  56. raise
  57. def extract_column_data(df: pd.DataFrame, column_identifier: Union[str, int], start_row: int = 2, header_row: int = 1) -> pd.Series:
  58. """提取指定列的数据,默认从第3行开始
  59. Args:
  60. df: pandas DataFrame
  61. column_identifier: 要提取的列名或列号(从0开始),也可以是列字母(如 'A', 'B')
  62. start_row: 开始提取的行号,默认为2(第3行)
  63. header_row: 标题行号,默认为1(第2行)
  64. Returns:
  65. 包含指定列数据的Series
  66. """
  67. try:
  68. if df.empty:
  69. logger.error("DataFrame为空")
  70. return pd.Series()
  71. # 处理列号或列名或列字母
  72. if isinstance(column_identifier, str) and column_identifier.isalpha():
  73. column_identifier = column_letter_to_index(column_identifier)
  74. if isinstance(column_identifier, int):
  75. if column_identifier < 0 or column_identifier >= len(df.columns):
  76. logger.error(f"列号 {column_identifier} 超出范围")
  77. raise ValueError(f"列号 {column_identifier} 超出范围")
  78. column_identifier = df.columns[column_identifier]
  79. # 确保列名存在
  80. if column_identifier not in df.columns:
  81. logger.error(f"列名 {column_identifier} 不存在")
  82. raise ValueError(f"列名 {column_identifier} 不存在")
  83. # 确保开始行在有效范围内
  84. if start_row >= len(df) or start_row < 0:
  85. logger.error(f"开始行 {start_row} 超出范围")
  86. raise ValueError(f"开始行 {start_row} 超出范围")
  87. # 提取指定列的数据
  88. column_data = df.iloc[start_row:][column_identifier]
  89. logger.info(f"成功提取列 {column_identifier} 数据,从第{start_row}行开始,共{len(column_data)}条数据")
  90. return column_data
  91. except Exception as e:
  92. logger.error(f"提取列数据时出错: {e}")
  93. raise
  94. def test_column_extraction(input_file: str):
  95. """测试列提取功能
  96. Args:
  97. input_file: 输入CSV文件路径
  98. """
  99. try:
  100. if not os.path.exists(input_file):
  101. logger.error(f"文件不存在: {input_file}")
  102. raise FileNotFoundError(f"文件不存在: {input_file}")
  103. # 读取CSV文件
  104. df = read_csv_with_header(input_file, header_row=1)
  105. # 提取第二列的数据,从第三行开始
  106. column_data = extract_column_data(df, column_identifier=1, start_row=2, header_row=1)
  107. # 打印提取的数据
  108. print("提取的列数据:")
  109. print(column_data)
  110. except Exception as e:
  111. logger.error(f"测试列提取时出错: {e}")
  112. if __name__ == '__main__':
  113. output_dir = Path('temp')
  114. input_file = output_dir / "测试.csv"
  115. test_column_extraction('/home/mrh/code/excel_tool/temp/测试.csv')