"""读取Excel文件""" import pandas as pd from sqlite3 import connect from mylib.settings import OUTPUT_DIR, WORK_DIR from datetime import datetime from sqlmodel import Field, SQLModel, create_engine, Session, select from sqlalchemy import text from database.sqlite_engine import engine from typing import Optional from database.sqlite_engine import create_db_and_tables, drop_table class KeywordModel(SQLModel, table=True): id: int = Field(default=None, primary_key=True) key_word: str = Field(unique=True) total_pages: Optional[int] = Field(default=None) done: bool = Field(default=False) class ExcelDatabaseManager: def __init__(self): self.engine = engine def read_excel_file(self, file_path: str): return pd.read_excel(file_path) def process_excel_data(self, df): """处理Excel数据""" records = [] for _, row in df.iterrows(): record = { "key_word": row[0], # 第一列作为key_word "total_pages": row[1] if len(row) > 1 else None, # 第二列作为total_pages "current_page": 0, # 新增当前页码 "done": False, # 默认done为False "last_updated": datetime.now() # 新增最后更新时间 } records.append(record) return records def insert_or_update_record(self, record): with Session(self.engine) as session: statement = select(KeywordModel).where(KeywordModel.key_word == record["key_word"]) existing = session.exec(statement).first() if existing: # 更新现有记录 existing.total_pages = record.get("total_pages", existing.total_pages) existing.done = record.get("done", existing.done) else: # 插入新记录 new_record = KeywordModel(**record) session.add(new_record) session.commit() def add_or_update(self, file_path: str): """从Excel文件导入数据到数据库""" df = self.read_excel_file(file_path) records = self.process_excel_data(df) for record in records: self.insert_or_update_record(record) def mark_keyword_done(self, keyword: str): """标记关键词为已完成""" with Session(self.engine) as session: statement = select(KeywordModel).where(KeywordModel.key_word == keyword) keyword_record = session.exec(statement).first() if keyword_record: keyword_record.done = True session.commit() def update_keyword_progress(self, keyword: str, current_page: int): """更新关键词的当前进度""" with Session(self.engine) as session: statement = select(KeywordModel).where(KeywordModel.key_word == keyword) keyword_record = session.exec(statement).first() if keyword_record: keyword_record.current_page = current_page keyword_record.last_updated = datetime.now() session.commit() def get_next_keyword(self): """获取下一个未完成的关键词""" with Session(self.engine) as session: statement = select(KeywordModel).where(KeywordModel.done == False) keyword = session.exec(statement.order_by(KeywordModel.last_updated)).first() return keyword def get_all_keywords(self) -> list[KeywordModel]: """获取所有关键词""" with Session(self.engine) as session: statement = select(KeywordModel) return session.exec(statement).all() def get_keywords_by_status(self, done: bool=False) -> list[KeywordModel]: """根据完成状态获取关键词""" with Session(self.engine) as session: statement = select(KeywordModel).where(KeywordModel.done == done) return session.exec(statement).all() def get_keywords_by_page(self, limit: int = 50, offset: int = 0) -> list[KeywordModel]: """分页获取关键词""" with Session(self.engine) as session: statement = select(KeywordModel).limit(limit).offset(offset) return session.exec(statement).all() def get_keywords(self) -> list[KeywordModel]: """获取所有关键词(兼容旧接口)""" return self.get_all_keywords() def main(): create_db_and_tables() manager = ExcelDatabaseManager() # manager.add_or_update(r"G:\weixin\WeChat Files\wxid_1fmirgx3vudo21\FileStorage\File\2025-01\测试-精油-2000.xlsx") keywords = manager.get_keywords() print([k.key_word for k in keywords[:50]]) print(len(keywords)) if __name__ == "__main__": main()