| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- import asyncio
- from datetime import datetime
- import re
- from typing import Optional
- from enum import Enum
- from typing import List, Any
- import os
- import sys
- sys.path.append(os.path.dirname(os.path.dirname(__file__)))
- from sqlmodel import Field, SQLModel,Column, Integer, Sequence, UniqueConstraint
- from config import DB_URL,logger
- # from db.common import engine
- from sqlalchemy.dialects.postgresql import insert
- from sqlalchemy.sql.sqltypes import Integer, String, DateTime
- from sqlalchemy.sql.schema import Column
- from sqlalchemy import UniqueConstraint
- from db.base import BaseRepository,DouyinBaseRepository
- from db.engine import engine,create_all
-
- class Categories(SQLModel,DouyinBaseRepository, table=True):
- id: int = Field(primary_key=True) # 分类的唯一标识符
- open_id: str = Field(foreign_key="useroauthtoken.open_id",index=True) # 关联到用户表的外键
- name: str = Field(default="default", index=True) # 分类的名称,添加索引以优化查询性能
- update_time: datetime = Field(default_factory=datetime.now) # 创建时间、更新时间
- # 添加联合唯一约束
- __table_args__ = (UniqueConstraint('open_id', 'name', name='uq_open_id_name'),)
-
-
-
- class DocumentCategories(SQLModel, table=True):
- document_id: int = Field(foreign_key="documents.id", primary_key=True) # 关联到文档表的外键
- category_id: int = Field(foreign_key="categories.id", primary_key=True) # 关联到分类表的外键
- class DocStatus:
- UNPROCESSED = 0 # 未处理
- COMPLETED = 100 # 已完成
- DISABLED = -1 # 禁用
-
- class Documents(SQLModel, table=True):
- id: Optional[int] = Field(primary_key=True)
- open_id: str = Field(foreign_key="useroauthtoken.open_id",index=True) # 关联到用户表的外键
- path: str = Field(nullable=False, index=True) # 相对路径
- status: int = Field(nullable=False) # 文档状态
- update_time: datetime = Field(default_factory=datetime.now) # 创建时间、更新时间
- __table_args__ = (UniqueConstraint('path'),)
-
- class CategoriesRepository(DouyinBaseRepository):
- def __init__(self, engine=engine):
- super().__init__(Categories, engine)
-
-
- class DocumentCategoriesRepository(DouyinBaseRepository):
- def __init__(self, engine=engine):
- super().__init__(DocumentCategories, engine)
-
-
- class DocumentsRepository(DouyinBaseRepository):
- def __init__(self, open_id, file_path, category_name="default", engine=engine):
- # file_path = {DATA_DIR}/{open_id}/docs/xxx/example_file.pdf
- relative_path = DocumentsRepository.get_relative_path(file_path)
- if relative_path == None:
- return
- self.doc_model = Documents(
- open_id=open_id,
- path=relative_path,
- status=DocStatus.UNPROCESSED,
- )
- self.category_model = Categories(
- open_id=open_id,
- name=category_name
- )
- super().__init__(Documents, engine)
- def get_relative_path(full_path):
- pattern = r'docs(/.*?)$'
- match = re.search(pattern, full_path)
- if match:
- return match.group(1)
- else:
- logger.error(f"Can not get rel path:{full_path}")
-
- async def add_document_with_categories(self):
- # document_id = await self.aadd_or_update(self.doc_model.model_dump(), constraint_name="path")
- # logger.debug(f"document_id:{document_id}")
- c = CategoriesRepository()
- category_id = await c.aon_conflict_do_nothing(self.category_model, index_elements=["open_id", "name"])
- logger.debug(f"category_id:{category_id}")
- return
- # 添加或更新文档
- await self.add_or_update_document(new_document.model_dump(), "document_id")
- # 获取已存在的分类
- categories_repo = CategoriesRepository()
- existing_categories = await categories_repo.get_all_by_ids(category_ids)
- existing_category_ids = {category.category_id for category in existing_categories}
- # 添加不存在的分类
- for category_id in set(category_ids) - existing_category_ids:
- new_category = Categories(open_id=new_document.open_id, category_id=category_id, category_name=f"Category_{category_id}") # 假设名称由 ID 生成
- await categories_repo.add([new_category])
- # 创建并添加文档分类关联关系
- document_categories_to_add = []
- for category_id in category_ids:
- doc_cat = DocumentCategories(document_id=new_document.document_id, category_id=category_id)
- document_categories_to_add.append(doc_cat)
- # 添加文档分类关联关系到数据库
- document_categories_repo = DocumentCategoriesRepository()
- await document_categories_repo.add(document_categories_to_add)
-
- # 示例使用
- async def main():
- from db.user import test_add
- open_id = await test_add()
- # 创建实例
- categories_repo = CategoriesRepository()
- documents_repo = DocumentsRepository(open_id,"/home/user/code/open-douyin/open_id/docs/readme2.md")
- document_categories_repo = DocumentCategoriesRepository()
- await documents_repo.add_document_with_categories()
- # 添加分类
- # doc1 = Documents(open_id=open_id, document_name="docs_fn", status="ready", file_path="/path")
- # doc2 = Documents(open_id=open_id, document_name="docs_jj", status="ready", file_path="/path")
- # 实现有关代码
-
- if __name__ == "__main__":
- import asyncio
- asyncio.run(main())
|