excel_import.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. """读取Excel文件"""
  2. import pandas as pd
  3. from sqlite3 import connect
  4. from config.settings import OUTPUT_DIR, WORK_DIR
  5. from datetime import datetime
  6. from sqlmodel import Field, SQLModel, create_engine, Session, select, func
  7. from sqlalchemy import text
  8. from database.sqlite_engine import engine
  9. from typing import Optional
  10. from database.sqlite_engine import create_db_and_tables, drop_table
  11. class KeywordModel(SQLModel, table=True):
  12. id: int = Field(default=None, primary_key=True)
  13. key_word: str = Field(unique=True)
  14. total_pages: Optional[int] = Field(default=None)
  15. done: bool = Field(default=False)
  16. class ExcelDatabaseManager:
  17. def __init__(self):
  18. self.engine = engine
  19. def read_excel_file(self, file_path: str):
  20. return pd.read_excel(file_path)
  21. def process_excel_data(self, df):
  22. """处理Excel数据"""
  23. records = []
  24. for _, row in df.iterrows():
  25. record = {
  26. "key_word": row[0], # 第一列作为key_word
  27. "total_pages": row[1] if len(row) > 1 else None, # 第二列作为total_pages
  28. "current_page": 0, # 新增当前页码
  29. "done": False, # 默认done为False
  30. "last_updated": datetime.now() # 新增最后更新时间
  31. }
  32. records.append(record)
  33. return records
  34. def insert_or_update_record(self, record):
  35. with Session(self.engine) as session:
  36. statement = select(KeywordModel).where(KeywordModel.key_word == record["key_word"])
  37. existing = session.exec(statement).first()
  38. if existing:
  39. # 更新现有记录
  40. existing.total_pages = record.get("total_pages", existing.total_pages)
  41. existing.done = record.get("done", existing.done)
  42. else:
  43. # 插入新记录
  44. new_record = KeywordModel(**record)
  45. session.add(new_record)
  46. session.commit()
  47. def add_or_update(self, file_path: str):
  48. """从Excel文件导入数据到数据库"""
  49. df = self.read_excel_file(file_path)
  50. records = self.process_excel_data(df)
  51. for record in records:
  52. self.insert_or_update_record(record)
  53. def mark_keyword_done(self, keyword: str):
  54. """标记关键词为已完成"""
  55. with Session(self.engine) as session:
  56. statement = select(KeywordModel).where(KeywordModel.key_word == keyword)
  57. keyword_record = session.exec(statement).first()
  58. if keyword_record:
  59. keyword_record.done = True
  60. session.commit()
  61. def update_keyword_progress(self, keyword: str, current_page: int):
  62. """更新关键词的当前进度"""
  63. with Session(self.engine) as session:
  64. statement = select(KeywordModel).where(KeywordModel.key_word == keyword)
  65. keyword_record = session.exec(statement).first()
  66. if keyword_record:
  67. keyword_record.current_page = current_page
  68. keyword_record.last_updated = datetime.now()
  69. session.commit()
  70. def get_next_keyword(self):
  71. """获取下一个未完成的关键词"""
  72. with Session(self.engine) as session:
  73. statement = select(KeywordModel).where(KeywordModel.done == False)
  74. keyword = session.exec(statement.order_by(KeywordModel.last_updated)).first()
  75. return keyword
  76. def get_all_keywords(self) -> list[KeywordModel]:
  77. """获取所有关键词"""
  78. with Session(self.engine) as session:
  79. statement = select(KeywordModel)
  80. return session.exec(statement).all()
  81. def get_keywords_by_status(self, done: bool=False) -> list[KeywordModel]:
  82. """根据完成状态获取关键词"""
  83. with Session(self.engine) as session:
  84. statement = select(KeywordModel).where(KeywordModel.done == done)
  85. return session.exec(statement).all()
  86. def get_keywords_by_page(self, limit: int = 50, offset: int = 0) -> list[KeywordModel]:
  87. """分页获取关键词"""
  88. with Session(self.engine) as session:
  89. statement = select(KeywordModel).limit(limit).offset(offset)
  90. return session.exec(statement).all()
  91. def get_keywords(self) -> list[KeywordModel]:
  92. """获取所有关键词(兼容旧接口)"""
  93. return self.get_all_keywords()
  94. # 获取行数总计
  95. def get_keywords_count(self) -> int:
  96. with Session(self.engine) as session:
  97. # 使用 select 和 func.count() 来统计行数
  98. statement = select(func.count()).select_from(KeywordModel)
  99. result = session.exec(statement)
  100. return result.one()
  101. def main():
  102. create_db_and_tables()
  103. manager = ExcelDatabaseManager()
  104. # manager.add_or_update(r"G:\weixin\WeChat Files\wxid_1fmirgx3vudo21\FileStorage\File\2025-01\测试-精油-2000.xlsx")
  105. keywords = manager.get_keywords()
  106. print([k.key_word for k in keywords[:50]])
  107. print(len(keywords))
  108. if __name__ == "__main__":
  109. main()