| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- """读取Excel文件"""
- import pandas as pd
- from sqlite3 import connect
- from config.settings import OUTPUT_DIR, WORK_DIR
- from datetime import datetime
- from sqlmodel import Field, SQLModel, create_engine, Session, select, func
- from sqlalchemy import text
- from database.sqlite_engine import engine
- from typing import Optional
- from database.sqlite_engine import create_db_and_tables, drop_table
- class KeywordModel(SQLModel, table=True):
- id: int = Field(default=None, primary_key=True)
- key_word: str = Field(unique=True)
- total_pages: Optional[int] = Field(default=None)
- done: bool = Field(default=False)
- class ExcelDatabaseManager:
- def __init__(self):
- self.engine = engine
-
- def read_excel_file(self, file_path: str):
- return pd.read_excel(file_path)
- def process_excel_data(self, df):
- """处理Excel数据"""
- records = []
- for _, row in df.iterrows():
- record = {
- "key_word": row[0], # 第一列作为key_word
- "total_pages": row[1] if len(row) > 1 else None, # 第二列作为total_pages
- "current_page": 0, # 新增当前页码
- "done": False, # 默认done为False
- "last_updated": datetime.now() # 新增最后更新时间
- }
- records.append(record)
- return records
- def insert_or_update_record(self, record):
- with Session(self.engine) as session:
- statement = select(KeywordModel).where(KeywordModel.key_word == record["key_word"])
- existing = session.exec(statement).first()
-
- if existing:
- # 更新现有记录
- existing.total_pages = record.get("total_pages", existing.total_pages)
- existing.done = record.get("done", existing.done)
- else:
- # 插入新记录
- new_record = KeywordModel(**record)
- session.add(new_record)
- session.commit()
- def add_or_update(self, file_path: str):
- """从Excel文件导入数据到数据库"""
- df = self.read_excel_file(file_path)
- records = self.process_excel_data(df)
- for record in records:
- self.insert_or_update_record(record)
- def mark_keyword_done(self, keyword: str):
- """标记关键词为已完成"""
- with Session(self.engine) as session:
- statement = select(KeywordModel).where(KeywordModel.key_word == keyword)
- keyword_record = session.exec(statement).first()
- if keyword_record:
- keyword_record.done = True
- session.commit()
- def update_keyword_progress(self, keyword: str, current_page: int):
- """更新关键词的当前进度"""
- with Session(self.engine) as session:
- statement = select(KeywordModel).where(KeywordModel.key_word == keyword)
- keyword_record = session.exec(statement).first()
- if keyword_record:
- keyword_record.current_page = current_page
- keyword_record.last_updated = datetime.now()
- session.commit()
- def get_next_keyword(self):
- """获取下一个未完成的关键词"""
- with Session(self.engine) as session:
- statement = select(KeywordModel).where(KeywordModel.done == False)
- keyword = session.exec(statement.order_by(KeywordModel.last_updated)).first()
- return keyword
- def get_all_keywords(self) -> list[KeywordModel]:
- """获取所有关键词"""
- with Session(self.engine) as session:
- statement = select(KeywordModel)
- return session.exec(statement).all()
- def get_keywords_by_status(self, done: bool=False) -> list[KeywordModel]:
- """根据完成状态获取关键词"""
- with Session(self.engine) as session:
- statement = select(KeywordModel).where(KeywordModel.done == done)
- return session.exec(statement).all()
- def get_keywords_by_page(self, limit: int = 50, offset: int = 0) -> list[KeywordModel]:
- """分页获取关键词"""
- with Session(self.engine) as session:
- statement = select(KeywordModel).limit(limit).offset(offset)
- return session.exec(statement).all()
- def get_keywords(self) -> list[KeywordModel]:
- """获取所有关键词(兼容旧接口)"""
- return self.get_all_keywords()
- # 获取行数总计
- def get_keywords_count(self) -> int:
- with Session(self.engine) as session:
- # 使用 select 和 func.count() 来统计行数
- statement = select(func.count()).select_from(KeywordModel)
- result = session.exec(statement)
- return result.one()
- def main():
- create_db_and_tables()
- manager = ExcelDatabaseManager()
- # manager.add_or_update(r"G:\weixin\WeChat Files\wxid_1fmirgx3vudo21\FileStorage\File\2025-01\测试-精油-2000.xlsx")
- keywords = manager.get_keywords()
- print([k.key_word for k in keywords[:50]])
- print(len(keywords))
- if __name__ == "__main__":
- main()
|