| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- import csv
- import chardet
- import logging
- from typing import List, Optional
- import os
- from mylib.logging_config import setup_logging
- setup_logging()
- logger = logging.getLogger("excel_tool" + '.' + __name__)
- def detect_encoding(file_path: str, sample_size: int = 100000) -> str:
- """检测文件编码
-
- Args:
- file_path: 文件路径
- sample_size: 用于检测的样本大小
-
- Returns:
- 检测到的编码字符串
- """
- try:
- if not os.path.exists(file_path):
- logger.error(f"File does not exist: {file_path}")
- return 'utf-8-sig'
-
- with open(file_path, 'rb') as f:
- # 读取样本数据用于检测
- raw_data = f.read(sample_size)
- result = chardet.detect(raw_data)
-
- # 获取置信度最高的编码
- encoding = result.get('encoding')
- confidence = result.get('confidence', 0)
-
- logger.info(f"Detected encoding: {encoding} (confidence: {confidence:.2%})")
-
- # 如果置信度低于阈值或编码为 None,尝试其他常见编码
- if not encoding or confidence < 0.7:
- logger.warning(f"Low confidence in detected encoding {encoding}, trying common encodings")
- return 'shift_jis' # 优先返回日文编码
-
- return encoding
-
- except Exception as e:
- logger.error(f"Error detecting encoding for {file_path}: {e}")
- return 'shift_jis' # 返回日文编码作为默认值
- def save_csv(data: List[List[str]], file_path: str) -> None:
- """将CSV数据保存为UTF-8编码文件
-
- Args:
- data: 要保存的CSV数据
- file_path: 目标文件路径
- """
- try:
- with open(file_path, 'w', encoding='utf-8', newline='') as f:
- writer = csv.writer(f)
- writer.writerows(data)
- logger.info(f"File saved as UTF-8: {file_path}")
- except Exception as e:
- logger.error(f"Error saving file {file_path}: {e}")
- raise
- def read_with_cp936(file_path: str) -> List[List[str]]:
- """使用cp936编码读取CSV文件
-
- Args:
- file_path: 文件路径
-
- Returns:
- 包含CSV数据的二维列表
- """
- try:
- logger.info(f"Reading file with cp936 encoding: {file_path}")
- with open(file_path, 'r', encoding='cp936', errors='replace') as f:
- reader = csv.reader(f)
- data = list(reader)
-
- # 保存转换后的文件
- output_file_path = file_path + '.utf8.csv'
- save_csv(data, output_file_path)
-
- return data
-
- except Exception as e:
- logger.error(f"Error reading file with cp936 encoding: {e}")
- return []
- def read_csv(file_path: str, specified_encoding: Optional[str] = None) -> List[List[str]]:
- """读取CSV文件并转换为utf-8编码
-
- Args:
- file_path: 文件路径
- specified_encoding: 用户指定的编码方式
-
- Returns:
- 包含CSV数据的二维列表
- """
- # 如果指定了cp936编码,直接使用专用函数
- if specified_encoding == 'cp936':
- return read_with_cp936(file_path)
-
- # 常见编码列表,优先尝试日文编码
- encodings_to_try = [
- 'shift_jis', # 日文常用编码
- 'cp932', # Windows日文编码
- 'euc-jp', # 日文EUC编码
- 'iso-2022-jp',# 日文JIS编码
- 'utf-8-sig', # UTF-8 with BOM
- 'gb18030', # 中文编码
- 'big5', # 繁体中文
- 'iso-8859-1',
- 'latin1'
- ]
-
- # 如果用户指定了编码,优先使用
- if specified_encoding:
- encodings_to_try.insert(0, specified_encoding)
- else:
- # 先尝试检测编码
- detected_encoding = detect_encoding(file_path)
- if detected_encoding:
- encodings_to_try.insert(0, detected_encoding)
-
- # 尝试用不同编码读取文件
- for encoding in encodings_to_try:
- try:
- logger.info(f"Trying encoding: {encoding}")
-
- with open(file_path, 'r', encoding=encoding, errors='replace') as f:
- reader = csv.reader(f)
- data = list(reader)
-
- logger.info(f"Successfully read file with encoding: {encoding}")
-
- # 打印前几行内容,使用DEBUG级别
- for row in data[:5]:
- logger.debug(f"Row: {row}")
-
- # 检查日文字符是否正确解码
- if encoding.startswith(('shift_jis', 'cp932', 'euc-jp')):
- japanese_chars = ''.join([cell for row in data[:5] for cell in row])
- if not any('\u3040' <= char <= '\u30ff' for char in japanese_chars): # 检查是否包含日文字符
- logger.warning(f"Japanese characters not detected with {encoding}, trying next encoding")
- continue
-
-
- return data
-
- except UnicodeDecodeError as e:
- logger.warning(f"Failed to decode with {encoding}: {e}")
- continue
- except Exception as e:
- logger.error(f"Error with encoding {encoding}: {e}")
- continue
-
- logger.error("Failed to read file with all attempted encodings")
- return [] # 返回空列表而不是退出
- def main():
- file_path = "/home/mrh/code/excel_tool/temp/测试.csv"
- data =read_csv(file_path, 'cp936')
- # 保存转换后的文件
- # output_file_path = file_path + '.utf8.csv'
- # save_csv(data, output_file_path)
- logger.info(data)
-
- if __name__ == "__main__":
- main()
|