|
|
@@ -1,48 +1,53 @@
|
|
|
from sqlite3 import connect
|
|
|
from mylib.settings import OUTPUT_DIR, WORK_DIR
|
|
|
-from sqlmodel import Field, SQLModel, create_engine
|
|
|
-from database.excel_import import read_excel_file, process_excel_data
|
|
|
-from sqlmodel import Session, select
|
|
|
+from datetime import datetime
|
|
|
+from sqlmodel import Field, SQLModel, create_engine, Session, select
|
|
|
|
|
|
|
|
|
sqlite_file_name = OUTPUT_DIR / "database.db"
|
|
|
sqlite_url = f"sqlite:///{sqlite_file_name}"
|
|
|
engine = create_engine(sqlite_url, echo=True)
|
|
|
|
|
|
-class Keyword(SQLModel, table=True):
|
|
|
+class SearchResult(SQLModel, table=True):
|
|
|
id: int = Field(default=None, primary_key=True)
|
|
|
- key_word: str = Field()
|
|
|
- totoal_pages:int = Field(nullable=True)
|
|
|
- done:bool = Field(default=False)
|
|
|
+ keyword: str = Field()
|
|
|
+ start: int = Field()
|
|
|
+ url: str = Field()
|
|
|
+ html_path: str = Field()
|
|
|
+ created_at: datetime = Field(default_factory=datetime.now())
|
|
|
|
|
|
-def insert_or_update_record(record):
|
|
|
- with Session(engine) as session:
|
|
|
- # 查找是否已存在相同key_word的记录
|
|
|
- statement = select(Keyword).where(Keyword.key_word == record["key_word"])
|
|
|
- existing = session.exec(statement).first()
|
|
|
-
|
|
|
- if existing:
|
|
|
- # 更新现有记录
|
|
|
- existing.totoal_pages = record["totoal_pages"]
|
|
|
- existing.done = record["done"]
|
|
|
- else:
|
|
|
- # 插入新记录
|
|
|
- new_record = Keyword(**record)
|
|
|
- session.add(new_record)
|
|
|
- session.commit()
|
|
|
-
|
|
|
-def add_or_update(file_path: str):
|
|
|
- """从Excel文件导入数据到数据库"""
|
|
|
- df = read_excel_file(file_path)
|
|
|
- records = process_excel_data(df)
|
|
|
- for record in records:
|
|
|
- insert_or_update_record(record)
|
|
|
def create_db_and_tables():
|
|
|
SQLModel.metadata.create_all(engine)
|
|
|
|
|
|
+class DatabaseManager:
|
|
|
+ def __init__(self):
|
|
|
+ self.engine = engine
|
|
|
+
|
|
|
+ def save_search_result(self, keyword: str, start: int, url: str, html_path: str) -> SearchResult:
|
|
|
+ """保存搜索结果到数据库"""
|
|
|
+ with Session(self.engine) as session:
|
|
|
+ result = SearchResult(
|
|
|
+ keyword=keyword,
|
|
|
+ start=start,
|
|
|
+ url=url,
|
|
|
+ html_path=html_path
|
|
|
+ )
|
|
|
+ session.add(result)
|
|
|
+ session.commit()
|
|
|
+ return result
|
|
|
+
|
|
|
+ def get_search_results(self, keyword: str):
|
|
|
+ """获取指定关键词的所有搜索结果"""
|
|
|
+ with Session(self.engine) as session:
|
|
|
+ results = session.exec(
|
|
|
+ select(SearchResult)
|
|
|
+ .where(SearchResult.keyword == keyword)
|
|
|
+ .order_by(SearchResult.start)
|
|
|
+ ).all()
|
|
|
+ return results
|
|
|
+
|
|
|
def main():
|
|
|
create_db_and_tables()
|
|
|
- add_or_update(r"G:\weixin\WeChat Files\wxid_1fmirgx3vudo21\FileStorage\File\2025-01\测试-精油-2000.xlsx")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main()
|