t_odm_beanie.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. from datetime import datetime
  2. from pydantic import BaseModel
  3. from typing import List, Dict, Optional,TypedDict
  4. from pymongo import MongoClient
  5. import asyncio
  6. from typing import Optional
  7. from motor.motor_asyncio import AsyncIOMotorClient,AsyncIOMotorDatabase
  8. from pydantic import BaseModel
  9. from beanie import Document, Indexed, init_beanie
  10. from beanie.odm.operators.update.general import Set
  11. class Variant(BaseModel):
  12. name: Optional[str] = None
  13. price: Optional[float] = None
  14. description: Optional[str] = None
  15. category: Optional[str] = None
  16. class Product(Document):
  17. name: Optional[str] = None
  18. price: Optional[float]
  19. # description: str
  20. description: Optional[str] = None
  21. image: Optional[str] = None
  22. brand: Optional[str] = None
  23. category: Optional[str] = None
  24. variant: Optional[List[Variant]] = None
  25. def test_get_product():
  26. product = Product(
  27. name="数据线",
  28. price=100,
  29. description="产品描述",
  30. image='123123',
  31. variant=[
  32. Variant(
  33. name="数据线",
  34. price=100,
  35. description="产品描述",
  36. category="电子产品"
  37. ),
  38. ]
  39. )
  40. return product
  41. async def insert_object():
  42. product = test_get_product()
  43. await product.insert()
  44. async def find_document():
  45. '''
  46. # 按条件查询
  47. product:Product = await Product.find(Product.name == '数据线').to_list()
  48. # 查找一个
  49. product:Product = await Product.find_one(Product.name == '数据线')
  50. '''
  51. product:Product = await Product.all().to_list()
  52. print(product)
  53. async def update_one():
  54. '''
  55. # https://beanie-odm.dev/api-documentation/operators/update/#set
  56. from beanie.odm.operators.update.general import Set
  57. # 按条件更新
  58. bar = await Product.find(Product.name == '数据线').update(Set({Product.price: 1}))
  59. # 用字典的方式更新也可以
  60. bar = await Product.find(Product.name == '数据线').update({"$set": {Product.price: 2}})
  61. # 所有文档,只要符合条件 Product.price > .5 都更新
  62. bar = await Product.find(Product.price > .5).update(Set({Product.price: 1}))
  63. # 更新一个
  64. bar = await Product.find_one(Product.name == '数据线').update(Set({Product.price: 1}))
  65. '''
  66. product:Product = await Product.find_one(Product.name == '数据线')
  67. bar = await product.update(Set({Product.price: 300}))
  68. bar.variant.append(Variant(category='电子', description='3M 红色 包裹', name='数据线', price=20))
  69. # variant = bar.variant[0] if len(bar.variant) > 0 else None
  70. # print(variant)
  71. await product.update({'$set': {Product.variant: bar.variant}})
  72. async def backup_data(
  73. source_model: Document, # 源模型(如 Product)
  74. target_db: AsyncIOMotorDatabase, # 目标数据库(如 backup_db)
  75. target_collection_name: Optional[str] = None # 可自定义目标集合名
  76. ):
  77. # 动态获取集合名称(默认使用模型的集合名)
  78. collection_name = target_collection_name or source_model._document_settings.name
  79. # 查询源数据
  80. docs = await source_model.all().to_list()
  81. if docs:
  82. # 转换为字典列表(保留 _id)
  83. data = [{"_id": doc.id, **doc.model_dump()} for doc in docs]
  84. # 插入到目标集合
  85. await target_db[collection_name].insert_many(data)
  86. async def example():
  87. # Beanie uses Motor async client under the hood
  88. client = AsyncIOMotorClient("mongodb://sv-v2:27017")
  89. # 不需要传参数据库名,直接通过 client.test 属性名的方式就能连接或创建 test 数据库
  90. # 或者通过 client["test"] 的方式连接或创建 test 数据库
  91. await init_beanie(database=client["test"], document_models=[Product])
  92. # await insert_object()
  93. # await update_one()
  94. await backup_data(Product, client["backup_test"])
  95. if __name__ == "__main__":
  96. asyncio.run(example())