| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- from datetime import datetime
- from pydantic import BaseModel
- from typing import List, Dict, Optional,TypedDict
- from pymongo import MongoClient
- import asyncio
- from typing import Optional
- from motor.motor_asyncio import AsyncIOMotorClient,AsyncIOMotorDatabase
- from pydantic import BaseModel
- from beanie import Document, Indexed, init_beanie
- from beanie.odm.operators.update.general import Set
- class Variant(BaseModel):
- name: Optional[str] = None
- price: Optional[float] = None
- description: Optional[str] = None
- category: Optional[str] = None
- class Product(Document):
- name: Optional[str] = None
- price: Optional[float]
- # description: str
- description: Optional[str] = None
- image: Optional[str] = None
- brand: Optional[str] = None
- category: Optional[str] = None
- variant: Optional[List[Variant]] = None
- def test_get_product():
- product = Product(
- name="数据线",
- price=100,
- description="产品描述",
- image='123123',
- variant=[
- Variant(
- name="数据线",
- price=100,
- description="产品描述",
- category="电子产品"
- ),
- ]
- )
- return product
- async def insert_object():
- product = test_get_product()
- await product.insert()
- async def find_document():
- '''
- # 按条件查询
- product:Product = await Product.find(Product.name == '数据线').to_list()
- # 查找一个
- product:Product = await Product.find_one(Product.name == '数据线')
- '''
- product:Product = await Product.all().to_list()
- print(product)
- async def update_one():
- '''
- # https://beanie-odm.dev/api-documentation/operators/update/#set
- from beanie.odm.operators.update.general import Set
- # 按条件更新
- bar = await Product.find(Product.name == '数据线').update(Set({Product.price: 1}))
- # 用字典的方式更新也可以
- bar = await Product.find(Product.name == '数据线').update({"$set": {Product.price: 2}})
- # 所有文档,只要符合条件 Product.price > .5 都更新
- bar = await Product.find(Product.price > .5).update(Set({Product.price: 1}))
- # 更新一个
- bar = await Product.find_one(Product.name == '数据线').update(Set({Product.price: 1}))
- '''
- product:Product = await Product.find_one(Product.name == '数据线')
- bar = await product.update(Set({Product.price: 300}))
- bar.variant.append(Variant(category='电子', description='3M 红色 包裹', name='数据线', price=20))
- # variant = bar.variant[0] if len(bar.variant) > 0 else None
- # print(variant)
- await product.update({'$set': {Product.variant: bar.variant}})
- async def backup_data(
- source_model: Document, # 源模型(如 Product)
- target_db: AsyncIOMotorDatabase, # 目标数据库(如 backup_db)
- target_collection_name: Optional[str] = None # 可自定义目标集合名
- ):
- # 动态获取集合名称(默认使用模型的集合名)
- collection_name = target_collection_name or source_model._document_settings.name
- # 查询源数据
- docs = await source_model.all().to_list()
- if docs:
- # 转换为字典列表(保留 _id)
- data = [{"_id": doc.id, **doc.model_dump()} for doc in docs]
-
- # 插入到目标集合
- await target_db[collection_name].insert_many(data)
- async def example():
- # Beanie uses Motor async client under the hood
- client = AsyncIOMotorClient("mongodb://sv-v2:27017")
- # 不需要传参数据库名,直接通过 client.test 属性名的方式就能连接或创建 test 数据库
- # 或者通过 client["test"] 的方式连接或创建 test 数据库
- await init_beanie(database=client["test"], document_models=[Product])
- # await insert_object()
- # await update_one()
- await backup_data(Product, client["backup_test"])
- if __name__ == "__main__":
- asyncio.run(example())
|