docs.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. import asyncio
  2. from datetime import datetime
  3. import re
  4. from typing import Optional
  5. from enum import Enum
  6. from typing import List, Any
  7. import os
  8. import sys
  9. sys.path.append(os.path.dirname(os.path.dirname(__file__)))
  10. from sqlmodel import Field, SQLModel,Column, Integer, Sequence, UniqueConstraint
  11. from config import DB_URL,logger
  12. # from db.common import engine
  13. from sqlalchemy.dialects.postgresql import insert
  14. from sqlalchemy.sql.sqltypes import Integer, String, DateTime
  15. from sqlalchemy.sql.schema import Column
  16. from sqlalchemy import UniqueConstraint
  17. from db.base import BaseRepository,DouyinBaseRepository
  18. from db.engine import engine,create_all
  19. class Categories(SQLModel,DouyinBaseRepository, table=True):
  20. id: int = Field(primary_key=True) # 分类的唯一标识符
  21. open_id: str = Field(foreign_key="useroauthtoken.open_id",index=True) # 关联到用户表的外键
  22. name: str = Field(default="default", index=True) # 分类的名称,添加索引以优化查询性能
  23. update_time: datetime = Field(default_factory=datetime.now) # 创建时间、更新时间
  24. # 添加联合唯一约束
  25. __table_args__ = (UniqueConstraint('open_id', 'name', name='uq_open_id_name'),)
  26. class DocumentCategories(SQLModel, table=True):
  27. document_id: int = Field(foreign_key="documents.id", primary_key=True) # 关联到文档表的外键
  28. category_id: int = Field(foreign_key="categories.id", primary_key=True) # 关联到分类表的外键
  29. class DocStatus:
  30. UNPROCESSED = 0 # 未处理
  31. COMPLETED = 100 # 已完成
  32. DISABLED = -1 # 禁用
  33. class Documents(SQLModel, table=True):
  34. id: Optional[int] = Field(primary_key=True)
  35. open_id: str = Field(foreign_key="useroauthtoken.open_id",index=True) # 关联到用户表的外键
  36. path: str = Field(nullable=False, index=True) # 相对路径
  37. status: int = Field(nullable=False) # 文档状态
  38. update_time: datetime = Field(default_factory=datetime.now) # 创建时间、更新时间
  39. __table_args__ = (UniqueConstraint('path'),)
  40. class CategoriesRepository(DouyinBaseRepository):
  41. def __init__(self, engine=engine):
  42. super().__init__(Categories, engine)
  43. class DocumentCategoriesRepository(DouyinBaseRepository):
  44. def __init__(self, engine=engine):
  45. super().__init__(DocumentCategories, engine)
  46. class DocumentsRepository(DouyinBaseRepository):
  47. def __init__(self, open_id, file_path, category_name="default", engine=engine):
  48. # file_path = {DATA_DIR}/{open_id}/docs/xxx/example_file.pdf
  49. relative_path = DocumentsRepository.get_relative_path(file_path)
  50. if relative_path == None:
  51. return
  52. self.doc_model = Documents(
  53. open_id=open_id,
  54. path=relative_path,
  55. status=DocStatus.UNPROCESSED,
  56. )
  57. self.category_model = Categories(
  58. open_id=open_id,
  59. name=category_name
  60. )
  61. super().__init__(Documents, engine)
  62. def get_relative_path(full_path):
  63. pattern = r'docs(/.*?)$'
  64. match = re.search(pattern, full_path)
  65. if match:
  66. return match.group(1)
  67. else:
  68. logger.error(f"Can not get rel path:{full_path}")
  69. async def add_document_with_categories(self):
  70. # document_id = await self.aadd_or_update(self.doc_model.model_dump(), constraint_name="path")
  71. # logger.debug(f"document_id:{document_id}")
  72. c = CategoriesRepository()
  73. category_id = await c.aon_conflict_do_nothing(self.category_model, index_elements=["open_id", "name"])
  74. logger.debug(f"category_id:{category_id}")
  75. return
  76. # 添加或更新文档
  77. await self.add_or_update_document(new_document.model_dump(), "document_id")
  78. # 获取已存在的分类
  79. categories_repo = CategoriesRepository()
  80. existing_categories = await categories_repo.get_all_by_ids(category_ids)
  81. existing_category_ids = {category.category_id for category in existing_categories}
  82. # 添加不存在的分类
  83. for category_id in set(category_ids) - existing_category_ids:
  84. new_category = Categories(open_id=new_document.open_id, category_id=category_id, category_name=f"Category_{category_id}") # 假设名称由 ID 生成
  85. await categories_repo.add([new_category])
  86. # 创建并添加文档分类关联关系
  87. document_categories_to_add = []
  88. for category_id in category_ids:
  89. doc_cat = DocumentCategories(document_id=new_document.document_id, category_id=category_id)
  90. document_categories_to_add.append(doc_cat)
  91. # 添加文档分类关联关系到数据库
  92. document_categories_repo = DocumentCategoriesRepository()
  93. await document_categories_repo.add(document_categories_to_add)
  94. # 示例使用
  95. async def main():
  96. from db.user import test_add
  97. open_id = await test_add()
  98. # 创建实例
  99. categories_repo = CategoriesRepository()
  100. documents_repo = DocumentsRepository(open_id,"/home/user/code/open-douyin/open_id/docs/readme2.md")
  101. document_categories_repo = DocumentCategoriesRepository()
  102. await documents_repo.add_document_with_categories()
  103. # 添加分类
  104. # doc1 = Documents(open_id=open_id, document_name="docs_fn", status="ready", file_path="/path")
  105. # doc2 = Documents(open_id=open_id, document_name="docs_jj", status="ready", file_path="/path")
  106. # 实现有关代码
  107. if __name__ == "__main__":
  108. import asyncio
  109. asyncio.run(main())