Просмотр исходного кода

暂时不搞了, function Calling 也很难处理数据。未来或许考虑 SQL

mrh 10 месяцев назад
Родитель
Сommit
8f053cf662
1 измененных файлов с 147 добавлено и 40 удалено
  1. 147 40
      function_calling/excel_processor.py

+ 147 - 40
function_calling/excel_processor.py

@@ -1,6 +1,10 @@
 import pandas as pd
 from typing import List, Optional, Union, Tuple
 import logging
+from dotenv import load_dotenv
+
+from mylib.pdfzh_translator import OpenAITranslator
+load_dotenv()
 from mylib.logging_config import setup_logging
 
 # Setup custom logging
@@ -13,6 +17,8 @@ class ExcelProcessor:
         self.file_path = file_path
         self.header_row = header_row
         self.df = self.read_file(self.file_path, self.header_row)
+        self.translator = OpenAITranslator()
+
 
     def _column_letter_to_index(self, col: str) -> int:
         """将Excel列字母转换为索引(A=0, B=1,...)"""
@@ -23,19 +29,20 @@ class ExcelProcessor:
             index = index * 26 + (ord(char) - ord('A') + 1)
         return index - 1
 
-    def _index_to_column_letter(self, index: int) -> str:
-        """将列索引转换为Excel列字母(0=A, 1=B,...)"""
-        if index < 0:
+    def _column_index_to_letter(self, idx: int) -> str:
+        """将Excel列索引转换为字母(0=A, 1=B,...)"""
+        if idx < 0:
             raise ValueError("列索引不能为负数")
         letters = []
-        while index >= 0:
-            letters.append(chr(ord('A') + (index % 26)))
-            index = index // 26 - 1
+        while idx >= 0:
+            letters.append(chr(ord('A') + (idx % 26)))
+            idx = (idx // 26) - 1
         return ''.join(reversed(letters))
 
+
     def read_column_data(self, column: Union[str, int], start_row: Optional[int] = None, end_row: Optional[int] = None) -> List[str]:
         """
-        读取指定列的数据
+        读取指定列的数据,不含表头
         
         Args:
             column (Union[str, int]): 列字母(如'A')或列索引(从0开始)
@@ -52,14 +59,14 @@ class ExcelProcessor:
                 
             # 设置默认值
             if start_row is None:
-                start_row = self.header_row + 1
+                start_row = 0
             if end_row is None:
                 end_row = len(self.df) - 1
                 
             # 读取数据
             data = self.df.iloc[start_row:end_row + 1, column].tolist()
             
-            logger.info(f"成功读取列 {column} 从 {start_row} 到 {end_row} 行的数据")
+            logger.info(f"成功读取列 {column} ,行 {start_row} : {end_row} 的数据")
             return data
             
         except Exception as e:
@@ -82,7 +89,7 @@ class ExcelProcessor:
                 
             # 设置默认值
             if start_row is None:
-                start_row = self.header_row + 1
+                start_row = 0
                 
             # 检查数据长度
             end_row = start_row + len(data)
@@ -98,15 +105,15 @@ class ExcelProcessor:
             logger.error(f"写入列数据失败: {str(e)}")
             raise
 
-    def search_headers(self, keywords: Union[str, List[str]]) -> List[Tuple[str, str]]:
+    def search_headers(self, keywords: Union[str, List[str]]) -> List[Tuple[str, int]]:
         """
-        从表头搜索关键词,返回匹配的列名和列字母
+        从表头搜索关键词,返回匹配的列名和列索引
         
         Args:
             keywords (Union[str, List[str]]): 要搜索的关键词或关键词列表
             
         Returns:
-            List[Tuple[str, str]]: 匹配的列名和列字母列表,格式为[(列名, 列字母), ...]
+            List[Tuple[str, int]]: 匹配的列名和列索引列表,格式为[(列名, 列索引), ...]
         """
         try:
             # 如果输入是单个字符串,转换为列表
@@ -120,10 +127,9 @@ class ExcelProcessor:
             matches = []
             for idx, col in enumerate(columns):
                 if any(keyword.lower() in str(col).lower() for keyword in keywords):
-                    col_letter = self._index_to_column_letter(idx)
-                    matches.append((col, col_letter))
+                    matches.append((col, idx))
                     
-            logger.info(f"搜索关键词 {keywords} 找到 {len(matches)} 个匹配列")
+            logger.info(f"搜索关键词 {keywords} 找到 {len(matches)} 个匹配列 {matches}")
             return matches
             
         except Exception as e:
@@ -172,50 +178,151 @@ class ExcelProcessor:
             logger.error(f"保存文件失败: {output_path}, 错误: {str(e)}")
             raise
 
-    def insert_column(self, ref_column: str, new_column_name: str, position: str = 'right') -> pd.DataFrame:
+    def _pad_values_to_match_index(self, values: list, length: int) -> list:
         """
-        在指定列旁边插入空列
+        如果values长度不足,用空字符串填充到指定长度
         
         Args:
-            ref_column (str): 参考列名或Excel列字母(如'A', 'B')
-            new_column_name (str): 新列名
-            position (str): 插入位置,'left'或'right',默认为'right'
+            values (list): 原始值列表
+            length (int): 目标长度
             
         Returns:
-            pd.DataFrame: 包含新列的DataFrame
+            list: 填充后的列表
+        """
+        if len(values) < length:
+            return values + [''] * (length - len(values))
+        return values
+
+    def insert_column_with_header(self, column: Union[str, int], header: str) -> int:
+        """
+        在指定列之后插入一个空列,并在新列的第一行写入表头
+        如果列名已存在,则返回现有列的索引
+
+        Args:
+            column (Union[str, int]): 要插入空列的列字母或列索引(从0开始)
+            header (str): 新列的表头
+
+        Returns:
+            int: 新列或现有列的索引
+        """
+        try:
+            # 如果输入是列字母,转换为列索引
+            if isinstance(column, str) and column.isalpha():
+                column_index = self._column_letter_to_index(column)
+            else:
+                column_index = int(column)
+
+            # 检查列名是否已存在
+            if header in self.df.columns:
+                logger.info(f"列 {header} 已存在,返回现有列索引")
+                return list(self.df.columns).index(header)
+
+            # 插入空列
+            self.df.insert(column_index + 1, header, '')
+            logger.info(f"成功在列 {column} 后插入空列,并在第一行写入表头 {header}")
+            return column_index + 1
+        except Exception as e:
+            logger.error(f"插入列失败: {str(e)}")
+            raise
+    def set_cell_value(self, column: str, row: int, value: str) -> None:
+        """
+        设置指定单元格的值
+        Args:
+            column (str): 列字母
+            row (int): 行号(从0开始)
+            value (str): 要设置的值
+        """
+        try:
+            # 将列字母转换为列索引
+            column_index = self._column_letter_to_index(column)
+            # 设置单元格的值
+            self.df.at[row, column_index] = value
+            logger.info(f"成功设置单元格 {column}{row} 的值为 {value}")
+        except Exception as e:
+            logger.error(f"设置单元格值失败: {str(e)}")
+            raise
+
+    def translate_column(self, column: Union[str, int]) -> int:
+        """
+        翻译指定列的内容,并在该列右侧插入翻译结果列
+        
+        Args:
+            column (Union[str, int]): 要翻译的列字母或列索引(从0开始)
+            
+        Returns:
+            int: 新插入的翻译结果列的索引
         """
-        df = self.df
         try:
-            # 如果ref_column是字母,转换为列索引
-            if ref_column.isalpha():
-                col_index = self._column_letter_to_index(ref_column)
-                ref_column = df.columns[col_index]
+            # 在指定列右侧插入新列
+            new_col_index = self.insert_column_with_header(column, '翻译结果')
             
-            # 获取参考列的位置
-            ref_index = df.columns.get_loc(ref_column)
+            # 读取原列数据
+            data = self.read_column_data(column)
+            logger.info(f"读取到的数据: {data}")
             
-            # 计算插入位置
-            insert_index = ref_index + 1 if position == 'right' else ref_index
+            # 批量翻译
+            translated_texts = self.translator._batch_translate(data)
             
-            # 插入新列
-            df.insert(insert_index, new_column_name, '')
-            logger.info(f"成功在列 '{ref_column}' 的 '{position}' 插入新列 '{new_column_name}'")
-            return df
+            # 写入翻译结果
+            self.write_column_data(new_col_index, translated_texts)
+            
+            # 返回新列索引
+            return new_col_index
             
         except Exception as e:
-            logger.error(f"插入列失败: {str(e)}")
+            logger.error(f"翻译列失败: {str(e)}")
             raise
 
+    def translate_columns_by_keywords(self, keywords: List[str]) -> None:
+        """
+        根据关键词搜索表头,批量翻译匹配的列
+        
+        Args:
+            keywords (List[str]): 要搜索的关键词列表
+        """
+        try:
+            # 搜索匹配的列
+            matches = self.search_headers(keywords)
+            ref_column_indices = [match[1] for match in matches]
+            logger.info(f"找到的匹配列索引: {ref_column_indices}")
+            adjusted_col_idx = 0
+            # 从左到右依次翻译
+            for title, col_idx in enumerate(ref_column_indices):
+                col_idx += adjusted_col_idx
+                logger.info(f"正在翻译第 {col_idx} 列")
+                
+                # 翻译当前列
+                new_col_idx = self.translate_column(adjusted_col_idx)
+                # 写入后的数据
+                new_data = self.read_column_data(new_col_idx)
+                logger.info(f"写入 {new_col_idx} 列后的数据: {new_data}")
+                
+                # 由于前面插入新列会影响后续列的索引,需要调整
+                adjusted_col_idx += 1
+                        
+        except Exception as e:
+            logger.error(f"批量翻译列失败: {str(e)}")
+            raise
+        
+    
 if __name__ == '__main__':
     # 测试代码
     processor = ExcelProcessor('/home/mrh/code/excel_tool/temp/测试.csv.utf8.csv')
-    # 测试CSV文件
-    processor.insert_column('B', '翻译结果')
+    processor.translate_columns_by_keywords(['类别', '搜索词'])
+    # matches = processor.search_headers(['类别', '搜索词'])
+    # ref_column_indices = [match[1] for match in matches]
+    # logger.info(f"找到的匹配列索引: {ref_column_indices}")
+    # process_col = ref_column_indices[0]
+    # new_col_index = processor.translate_column(process_col)
+    # new_data = processor.read_column_data(new_col_index)
+    # print(f"写入后的数据: {new_data}")
+    
+    
     
+    # 获得某一列的所有数据
+    # processor.set_cell_value('C', 0, '测试')
     # 测试搜索表头
     # matches = processor.search_headers(['类别', '搜索词'])
-    # ref_column_list = [match[1] for match in matches]
-    # print(f"找到的匹配列: {matches}")
     # logger.info(f"找到的匹配列号: {ref_column_list}")
     
     # # 测试读取列数据