t_mongo.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. from datetime import datetime
  2. from pydantic import BaseModel
  3. from typing import List, Dict, Optional,TypedDict
  4. from pymongo import MongoClient
  5. uri = "mongodb://sv-v2:27017/"
  6. client = MongoClient(uri)
  7. database = client.get_database("amazone")
  8. def create_collection():
  9. # https://www.mongodb.com/zh-cn/docs/languages/python/pymongo-driver/current/databases-collections/
  10. create_collection = database.create_collection("test")
  11. print(create_collection)
  12. for i in range(10):
  13. create_collection.insert_one({"name": f"test{i}"})
  14. def show_collection():
  15. collection = database.get_collection("test")
  16. for i in collection.find():
  17. print(i)
  18. print(type(i))
  19. class Variant(TypedDict):
  20. name: str
  21. price: float
  22. description: str
  23. category: str
  24. class Product(BaseModel):
  25. name: Optional[str] = None
  26. price: Optional[float]
  27. # description: str
  28. description: Optional[str] = None
  29. image: Optional[str] = None
  30. brand: Optional[str] = None
  31. category: Optional[str] = None
  32. variant: Optional[List[Variant]] = None
  33. def test_get_product():
  34. product = Product(
  35. name="数据线",
  36. price=100,
  37. description="产品描述",
  38. image='123123',
  39. variant=[
  40. Variant(
  41. name="数据线",
  42. price=100,
  43. description="产品描述",
  44. category="电子产品"
  45. ),
  46. ]
  47. )
  48. return product
  49. def insert_object():
  50. product = test_get_product()
  51. collection = database.get_collection("test")
  52. collection.insert_one(product)
  53. def find_one():
  54. collection = database.get_collection("test")
  55. results = collection.find_one({ "name" : "数据线" })
  56. print(results)
  57. print(results["name"])
  58. results = collection.find_one({ "price" : 100 })
  59. print(results)
  60. def update_one():
  61. collection = database.get_collection("test")
  62. results = collection.find_one({ "name" : "数据线" })
  63. results_model = Product(**results)
  64. results_model.image = "123"
  65. # results = collection.update_one({ "name" : "数据线" }, { "$set": { "image": "123" } })
  66. collection.update_one(upsert=True,filter={ "name" : "数据线" }, update={ "$set": { "image": "123" } })
  67. print(results)
  68. class Version(TypedDict):
  69. desc: str
  70. create_time: datetime
  71. version: str
  72. def backup_document():
  73. back_collection = database['backup']
  74. product = test_get_product()
  75. version = Version(
  76. desc="完成 MongoDB 数据库操作的测试和备份",
  77. create_time=datetime.now(),
  78. )
  79. product['version'] = version
  80. back_collection.insert_one(product)
  81. def main():
  82. # create_collection()
  83. # show_collection()
  84. # insert_object()
  85. # find_one()
  86. update_one()
  87. # backup_document()
  88. if __name__ == "__main__":
  89. main()