read_encoding_cvs.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. import csv
  2. import chardet
  3. import sys
  4. import logging
  5. from typing import List
  6. logger = logging.getLogger(__name__)
  7. def detect_encoding(file_path: str, sample_size: int = 100000) -> str:
  8. """检测文件编码
  9. Args:
  10. file_path: 文件路径
  11. sample_size: 用于检测的样本大小
  12. Returns:
  13. 检测到的编码字符串
  14. """
  15. try:
  16. with open(file_path, 'rb') as f:
  17. # 读取样本数据用于检测
  18. raw_data = f.read(sample_size)
  19. result = chardet.detect(raw_data)
  20. # 获取置信度最高的编码
  21. encoding = result.get('encoding')
  22. confidence = result.get('confidence', 0)
  23. logger.info(f"Detected encoding: {encoding} (confidence: {confidence:.2%})")
  24. # 如果置信度低于阈值或编码为 None,尝试其他常见编码
  25. if not encoding or confidence < 0.7:
  26. logger.warning(f"Low confidence in detected encoding {encoding}, trying common encodings")
  27. return 'utf-8-sig'
  28. return encoding
  29. except Exception as e:
  30. logger.error(f"Error detecting encoding for {file_path}: {e}")
  31. return 'utf-8-sig' # 返回默认编码而不是退出
  32. def read_csv(file_path: str, to_encode: str = 'utf-8') -> List[List[str]]:
  33. """读取CSV文件并转换为指定编码
  34. Args:
  35. file_path: 文件路径
  36. to_encode: 目标编码,默认为utf-8
  37. Returns:
  38. 包含CSV数据的二维列表
  39. """
  40. # 常见编码列表,按优先级排序
  41. encodings_to_try = ['utf-8-sig', 'gb18030', 'shift_jis', 'euc-jp', 'iso-8859-1', 'latin1']
  42. # 先尝试检测编码
  43. detected_encoding = detect_encoding(file_path)
  44. if detected_encoding:
  45. encodings_to_try.insert(0, detected_encoding)
  46. # 尝试用不同编码读取文件
  47. for encoding in encodings_to_try:
  48. try:
  49. logger.info(f"Trying encoding: {encoding}")
  50. with open(file_path, 'r', encoding=encoding, errors='replace') as f:
  51. reader = csv.reader(f)
  52. data = list(reader)
  53. # 如果源编码与目标编码不同,进行转换
  54. if encoding.lower() != to_encode.lower():
  55. logger.info(f"Converting from {encoding} to {to_encode}")
  56. data = [
  57. [cell.encode(to_encode, errors='replace').decode(to_encode)
  58. if isinstance(cell, str) else cell
  59. for cell in row]
  60. for row in data
  61. ]
  62. logger.info(f"Successfully read file with encoding: {encoding}")
  63. return data
  64. except UnicodeDecodeError as e:
  65. logger.warning(f"Failed to decode with {encoding}: {e}")
  66. continue
  67. except Exception as e:
  68. logger.error(f"Error with encoding {encoding}: {e}")
  69. continue
  70. logger.error("Failed to read file with all attempted encodings")
  71. return [] # 返回空列表而不是退出