translate_utils.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. import os
  2. import logging
  3. import pandas as pd
  4. from pathlib import Path
  5. from typing import List, Tuple, Union
  6. from mylib.pdfzh_translator import OpenAITranslator
  7. from mylib.read_encoding_cvs import read_csv
  8. from mylib.logging_config import setup_logging
  9. # Setup custom logging
  10. setup_logging()
  11. logger = logging.getLogger('mylib.translate_utils')
  12. def column_letter_to_index(col_letter: str) -> int:
  13. """将列字母转换为列索引(从0开始)
  14. Args:
  15. col_letter: 列字母(如 'A', 'B', 'AA' 等)
  16. Returns:
  17. 列索引(从0开始)
  18. """
  19. try:
  20. col_index = 0
  21. for i, char in enumerate(reversed(col_letter.upper())):
  22. col_index += (ord(char) - ord('A') + 1) * (26 ** i)
  23. return col_index - 1
  24. except Exception as e:
  25. logger.error(f"列字母转换时出错: {e}")
  26. raise
  27. def read_csv_with_header(file_path: str, header_row: int = 1, encoding: str = None) -> pd.DataFrame:
  28. """读取CSV文件并正确处理标题行
  29. Args:
  30. file_path: CSV文件路径
  31. header_row: 标题行号(从0开始),默认为1(第2行)
  32. encoding: 文件编码
  33. Returns:
  34. pandas DataFrame
  35. """
  36. try:
  37. # 读取所有数据
  38. data = read_csv(file_path, encoding)
  39. if not data:
  40. raise ValueError("读取的文件为空")
  41. # 确保header_row在有效范围内
  42. if header_row >= len(data):
  43. raise ValueError(f"标题行 {header_row} 超出文件范围")
  44. # 使用指定行作为列名,前面的行丢弃
  45. df = pd.DataFrame(data[header_row+1:], columns=data[header_row])
  46. logger.info(f"成功读取CSV文件,使用第{header_row+1}行作为标题行")
  47. logger.info(f"列标题: {df.columns.tolist()}")
  48. return df
  49. except Exception as e:
  50. logger.error(f"读取CSV文件时出错: {e}")
  51. raise
  52. def extract_column_data(df: pd.DataFrame, column_identifier: Union[str, int], start_row: int = 2, header_row: int = 1) -> pd.Series:
  53. """提取指定列的数据,默认从第3行开始
  54. Args:
  55. df: pandas DataFrame
  56. column_identifier: 要提取的列名或列号(从0开始),也可以是列字母(如 'A', 'B')
  57. start_row: 开始提取的行号,默认为2(第3行)
  58. header_row: 标题行号,默认为1(第2行)
  59. Returns:
  60. 包含指定列数据的Series
  61. """
  62. try:
  63. if df.empty:
  64. return pd.Series()
  65. # 处理列号或列名或列字母
  66. if isinstance(column_identifier, str) and column_identifier.isalpha():
  67. column_identifier = column_letter_to_index(column_identifier)
  68. if isinstance(column_identifier, int):
  69. if column_identifier < 0 or column_identifier >= len(df.columns):
  70. raise ValueError(f"列号 {column_identifier} 超出范围")
  71. column_identifier = df.columns[column_identifier]
  72. # 确保列名存在
  73. if column_identifier not in df.columns:
  74. raise ValueError(f"列名 {column_identifier} 不存在")
  75. # 确保开始行在有效范围内
  76. if start_row >= len(df) or start_row < 0:
  77. raise ValueError(f"开始行 {start_row} 超出范围")
  78. # 提取指定列的数据
  79. column_data = df.iloc[start_row:][column_identifier]
  80. logger.info(f"成功提取列 {column_identifier} 数据,从第{start_row}行开始,共{len(column_data)}条数据")
  81. return column_data
  82. except Exception as e:
  83. logger.error(f"提取列数据时出错: {e}")
  84. raise
  85. def test_column_extraction():
  86. """测试列数据提取功能"""
  87. # 创建测试数据
  88. test_data = [
  89. ['', 'Ignore this row'], # 第1行
  90. ['Col1', 'Col2', 'Col3'], # 第2行(标题行)
  91. ['1a', '2a', '3a'], # 第3行
  92. ['1b', '2b', '3b'], # 第4行
  93. ['1c', '2c', '3c'] # 第5行
  94. ]
  95. # 创建DataFrame
  96. df = pd.DataFrame(test_data[2:], columns=test_data[1])
  97. try:
  98. # 测试提取第二列(Col2),从第三行开始
  99. result = extract_column_data(df, column_identifier=1, start_row=1, header_row=1)
  100. # 预期结果
  101. expected = pd.Series(['2b', '2c'], name='Col2')
  102. # 验证结果
  103. if result.equals(expected):
  104. print("测试通过!")
  105. print("提取结果:")
  106. print(result)
  107. else:
  108. print("测试失败!")
  109. print("预期结果:")
  110. print(expected)
  111. print("实际结果:")
  112. print(result)
  113. except Exception as e:
  114. print(f"测试失败:{e}")
  115. if __name__ == '__main__':
  116. test_column_extraction()