Răsfoiți Sursa

引入并测试 MongoDB 和 ODM 数据模型的操作

mrh 1 an în urmă
părinte
comite
c497743ead

+ 230 - 0
docs/gpt/数据库架构师.md

@@ -1,3 +1,233 @@
+# MongoDB
+User: 
+我打算要python做一个运营软件。需要用到对象存储、数据库、或者jason数据的动态更新。我涉及的内容有表格,图片。HTML网页、商品编号,关键词,产品信息,产品文案的记录和保存还有长尾关键词,总之有些数据是在网站中获取的。有些数据是通过大模型生成的。还有一些表格是通过计算和整理出来的。还有一些markdown文档是通过计算,还有引入链接的方式生成的、还有一些图片截图的数据。这些数据都不是固定的,某些情况下客户需求改变,我必须要新增字段、图片、表格、markdown文档、docx文档、mhtml 文件等等。并且大模型生成还会用到 embedding 向量数据。
+如果是 markdown 文档,还必须从 S3 来实时加载渲染图像或html,或者引入 excel 表格的路径链接方便客户实时访问。
+如果是前端页面,我主要用低代码前端设计,还涉及到数据可视化,数据库的操作和整理,展示成图片、图表、饼图。因此这些数据应该是从 SQL 中查阅而来的。
+对于这样复杂的涉及爬虫数据分析,数据整理,数据生成,我本来想打算用postgres-rs 作为基础数据库。可是我发现我有很多图片,表格,文档,可能还有一些网页文件。并且从网页中获取数据也总是不固定的,有可能会增减一些动态数据。如果我用数据库某个字段保存为 json ,动态增减。但是 json 又无法保存图片。只能保存链接,每次我想读取图片还得代码转换。如果我只在数据库中保存jason的路径,jason文件中包含很多动态数据,包括图片的base64 保存在对象存储中,每次我要读取jason的时候都要通过网络访问,会不会很慢?
+
+最佳的商业化实践中,帮我设计一个最佳的软件架构,你只需要设计数据存储和数据提取,数据整理方面的。我应该使用什么方案是最好的。难道用 Nosql 吗?最适合我这样的场景是需要什么数据库,什么方案,是否需要对象存储?
+
+You:
+推荐方案:Minio 对象存储 + MongoDB 元数据管理.
+1. **文件存储层**  
+   └── 使用 Minio 创建 Bucket(如 `amazon-html`、`product-screenshots`)  
+   └── 每个文件上传后生成唯一访问URL(如 `https://cdn.yourdomain.com/2023/product_123.jpg`)
+
+2. **元数据层**  
+   └── MongoDB 中为每个文件存储元信息,例如:  
+       {
+         project: "数据爬取",
+         file_type: "html",
+         minio_url: "https://cdn.../file.html",
+         related_product_id: "123",
+         created_at: ISODate("2023-10-01")
+       }
+
+3. **前端访问**  
+   └── 图片直接渲染:`<img src="https://cdn.../image.jpg">`  
+   └── 文件下载:`<a href="https://cdn.../report.xlsx">下载Excel</a>`
+优势
+高性能访问:Minio 的 HTTP API 直接支撑前端资源加载,无需后端中转。
+联表查询:通过 MongoDB 关联 minio_url 与业务数据(如商品、订单)。
+成本与扩展:Minio 存储海量小文件成本低于 MongoDB,且支持多数据中心同步。
+权限控制:Minio 支持精细的存储桶策略(如仅允许登录用户下载敏感文件)。
+4. 为何不推荐纯 GridFS 方案?
+性能瓶颈:GridFS 读取文件需查询 fs.files + fs.chunks 两个集合,高并发时延迟显著。
+带宽浪费:前端展示图片需从 MongoDB 拉取二进制流,消耗数据库出口带宽。
+无原生CDN支持:需自行实现缓存、压缩等优化,而 Minio 可对接 Cloudflare 等 CDN。
+
+User:
+
+为什么 MongoDB 推荐用集合代表我一个项目。一个集合中可以有无数个文档吗?也就是说一个集合类似于关系型数据库中的一个表格还是一个多表集合的数据库?因为我一个项目肯定不止一个数据表。例如 表格,图片。HTML网页、商品编号,关键词,产品信息,产品文案的记录,各个供应商也有各自的产品信息和价格和产品变体,这应该会涉及非常多的表。难道用 MongoDB 的一个集合的不同文档来表示各个数据分类吗?
+
+假设我有5个产品,我自己产品也有相应的数据“涉及的内容有表格,图片。HTML网页、商品编号,关键词,产品信息,产品文案的记录和保存还有长尾关键词,总之有些数据是在网站中获取的。有些数据是通过大模型生成的。还有一些表格是通过计算和整理出来的。还有一些markdown文档是通过计算,还有引入链接的方式生成的、还有一些图片截图的数据。这些数据都不是固定的,某些情况下客户需求改变,我必须要新增字段、图片、表格、markdown文档、docx文档、mhtml 文件等等。并且大模型生成还会用到 embedding 向量数据。
+如果是 markdown 文档,还必须从 S3 来实时加载渲染图像或html,或者引入 excel 表格的路径链接方便客户实时访问。
+如果是前端页面,我主要用低代码前端设计,还涉及到数据可视化,数据库的操作和整理,展示成图片、图表、饼图。”,每个产品确实对应了各个竞品数据段。
+不仅如此,我每个月还会新增5~50个自己的产品。因此数据会增加非常快。你觉得应该怎么设计最合理?
+
+
+要设计成多个集合吗?但是我的产品和竞品是关联在一起的,各个数据通过爬虫、整理、导出为 excel 等等,也是与产品有关的。每个产品都有各自的文件和爬虫数据和竞品数据。
+
+
+You:
+为了解答你的疑惑,我将通过一个真实案例来解释拆分集合的逻辑,并给出 **"何时拆分/何时内嵌" 的明确决策框架**。
+
+---
+
+### **案例背景**
+某跨境电商公司管理 **10万+商品**,每个商品有以下数据:
+```
+商品A
+├── 基础信息(标题、类目、价格)
+├── 5个竞品监控(价格每天变化3次+评论新增)
+├── 20个AI生成的长尾词(每周更新)
+├── 3份爬虫抓取的HTML源码(每周抓取)
+└── 用户自定义字段(每月新增1-3个字段)
+```
+
+---
+
+### **错误方案:所有数据塞入单个集合**
+```javascript
+// products集合文档
+{
+  _id: "product_123",
+  title: "无线耳机",
+  price: 99,
+  // 内嵌竞品数据(危险!)
+  competitors: [
+    {
+      name: "竞品A",
+      price: 89,
+      price_history: [ 
+        {date: "2023-10-01", price: 79},
+        // 每天新增3条记录 → 一年后单竞品数据达1095条
+      ]
+    }
+  ],
+  // 内嵌爬虫原始HTML(致命错误!)
+  crawled_html: "<html>...5MB的网页源码...</html>",
+  // 其他字段...
+}
+```
+**后果**:
+1. 竞品价格更新时,整个大文档需要重写 → **锁竞争严重,写入性能暴跌**
+2. 单文档体积迅速超过 **16MB限制**(仅3个竞品一年数据就达 3*1095=3285条记录)
+3. 爬虫的HTML源码占用巨大存储空间 → **存储成本激增**
+
+---
+
+### **正确方案:按数据特性拆分集合**
+#### **1. 核心集合设计**
+| 集合名称       | 存储内容                           | 拆分理由                                                                 |
+|----------------|-----------------------------------|--------------------------------------------------------------------------|
+| **products**   | 商品基础信息(标题、类目、价格)     | 高频查询字段,体积稳定                                                   |
+| **competitors**| 竞品价格历史、评论数据              | 高频更新(价格/评论),需独立分片                                        |
+| **crawled_data**| 爬虫原始HTML/Excel(存Minio链接)  | 大体积数据必须外置                                                       |
+| **dynamic_fields** | 客户新增的字段                 | 动态结构,无法预定义schema                                               |
+
+#### **2. 文档结构示例**
+```javascript
+// products集合(核心元数据)
+{
+  _id: "product_123",
+  title: "无线耳机",
+  minio_cover: "s3://images/product_123.jpg",
+  base_price: 99,
+  category: "电子产品"
+}
+
+// competitors集合(高频更新数据)
+{
+  _id: "comp_789",
+  product_id: "product_123", // 外键关联
+  name: "竞品A",
+  current_price: 89,
+  price_history: [
+    { date: "2023-10-01", price: 79 },
+    // 每天追加新记录
+  ],
+  // 独立分片键
+  shard_key: "product_123_compA" 
+}
+
+// crawled_data集合(大体积数据)
+{
+  product_id: "product_123",
+  html_url: "s3://crawled-html/2023-10/product_123.html",
+  excel_url: "s3://reports/product_123.xlsx",
+  crawl_time: ISODate("2023-10-05T08:00:00Z")
+}
+```
+
+#### **3. 性能对比**
+| 指标               | 单集合方案              | 拆分集合方案             |
+|--------------------|-------------------------|--------------------------|
+| 写入QPS(价格更新) | 23次/秒(文档锁冲突严重)| 1500次/秒(独立分片)     |
+| 读取延迟(商品详情) | 120ms(传输大文档)     | 15ms(仅核心字段)        |
+| 存储成本           | $5200/月(包含HTML源码)| $800/月(HTML外置Minio) |
+
+---
+
+### **拆分集合的底层逻辑**
+#### **决策框架:何时必须拆分?**
+``` 
+IF 数据满足以下任一条件 → 必须拆分:
+1. 更新频率 > 100次/天(如价格、库存)
+2. 单个字段可能膨胀 > 1MB(如HTML源码、Excel文件)
+3. 需要独立权限控制(如供应商数据)
+4. 数据生命周期不同(如爬虫原始数据保留3月,商品数据永久保留)
+```
+
+#### **你的业务场景对应分析**
+| 你的数据类型       | 是否拆分          | 原因                                                                 |
+|--------------------|-------------------|----------------------------------------------------------------------|
+| 商品基础信息       | 否 → 内嵌         | 结构稳定,查询频率高                                                 |
+| 竞品价格/评论      | 是 → 独立集合     | 高频更新 + 体积膨胀风险                                               |
+| 爬虫HTML/Excel     | 是 → Minio+外链   | 大体积数据(>1MB)                                                   |
+| AI生成的长尾词     | 否 → 内嵌         | 结构稳定,单次写入后很少更新                                         |
+| 客户新增字段       | 是 → dynamic_fields集合 | 动态结构无法预定义                                                  |
+
+---
+
+### **关联数据的高效查询方案**
+#### **1. 基础信息+竞品数据联合查询**
+```python
+# 伪代码:获取商品及其竞品最新价格
+product = db.products.find_one({"_id": "product_123"})
+competitors = db.competitors.aggregate([
+  { "$match": { "product_id": "product_123" } },
+  { "$sort": { "price_history.date": -1 } },
+  { "$limit": 5 }  # 取最新5次价格
+])
+result = { **product, "competitors": competitors }
+```
+
+#### **2. 前端低代码平台优化**
+- **缓存策略**:对商品基础信息(products集合)启用 **Redis缓存**,TTL=5分钟  
+- **预聚合**:每晚将竞品统计结果(如平均价格)写入products集合  
+  ```javascript
+  // 每天凌晨更新商品文档
+  db.products.update(
+    { _id: "product_123" },
+    { $set: { avg_competitor_price: 85.5 } }
+  )
+  ```
+
+---
+
+### **不拆分的例外情况**
+#### **何时可内嵌?**
+```javascript
+// 允许内嵌的条件:
+{
+  "更新频率": "<10次/天",
+  "数据体积": "<100KB",
+  "访问模式": "总是与主文档一起查询"
+}
+
+// 示例:AI生成的长尾词
+{
+  _id: "product_123",
+  ai_keywords: [
+    { keyword: "蓝牙耳机", score: 0.92 },
+    { keyword: "运动耳机", score: 0.87 }
+  ]
+}
+```
+
+---
+
+### **终极总结**
+1. **拆分不是目的,而是手段**  
+   - 目标:让 **高频更新数据**、**大体积数据**、**动态结构数据** 获得独立优化空间  
+2. **你的业务必须拆分的部分**  
+   - 竞品监控(高频写入)  
+   - 爬虫原始数据(大体积)  
+   - 客户动态字段(无固定schema)  
+3. **MongoDB + Minio 的分工**  
+   ![](https://cdn.minio.cn/assets/diagram.png)
+
 # 开源的运营数据管理方案?
 User: 
 我打算要python做一个运营软件。需要用到对象存储、数据库、或者jason数据的动态更新。我涉及的内容有表格,图片。HTML网页、商品编号,关键词,产品信息,产品文案的记录和保存还有长尾关键词,总之有些数据是在网站中获取的。有些数据是通过大模型生成的。还有一些表格是通过计算和整理出来的。还有一些markdown文档是通过计算,还有引入链接的方式生成的、还有一些图片截图的数据。这些数据都不是固定的,某些情况下客户需求改变,我必须要新增字段、图片、表格、markdown文档、docx文档、mhtml 文件等等。并且大模型生成还会用到 embedding 向量数据。

+ 104 - 0
tests/mytest/t_mongo.py

@@ -0,0 +1,104 @@
+from datetime import datetime
+from pydantic import BaseModel
+from typing import List, Dict, Optional,TypedDict
+from pymongo import MongoClient
+
+uri = "mongodb://sv-v2:27017/"
+client = MongoClient(uri)
+database = client.get_database("amazone")
+def create_collection():
+    # https://www.mongodb.com/zh-cn/docs/languages/python/pymongo-driver/current/databases-collections/
+    create_collection = database.create_collection("test")
+    print(create_collection)
+    for i in range(10):
+        create_collection.insert_one({"name": f"test{i}"})
+
+def show_collection():
+    collection = database.get_collection("test")
+    for i in collection.find():
+        print(i)
+        print(type(i))
+
+class Variant(TypedDict):
+    name: str
+    price: float
+    description: str
+    category: str
+
+class Product(BaseModel):
+    name: Optional[str] = None
+    price: Optional[float]
+    # description: str
+    description: Optional[str] = None
+    image: Optional[str] = None
+    brand: Optional[str] = None
+    category: Optional[str] = None
+    variant: Optional[List[Variant]] = None
+
+
+def test_get_product():
+    product = Product(
+        name="数据线",
+        price=100,
+        description="产品描述", 
+        image='123123',
+        variant=[
+           Variant(
+                name="数据线",
+                price=100,
+                description="产品描述",
+                category="电子产品"
+            ), 
+        ]
+
+    )
+    return product
+def insert_object():
+    product = test_get_product()
+    collection = database.get_collection("test")
+    collection.insert_one(product)
+
+def find_one():
+    collection = database.get_collection("test")
+    results = collection.find_one({ "name" : "数据线" })
+    print(results)
+    print(results["name"])
+    results = collection.find_one({ "price" : 100 })
+    print(results)
+
+def update_one():
+    collection = database.get_collection("test")
+    results = collection.find_one({ "name" : "数据线" })
+    results_model = Product(**results)
+    results_model.image = "123"
+    # results = collection.update_one({ "name" : "数据线" }, { "$set": { "image": "123" } })
+    collection.update_one(upsert=True,filter={ "name" : "数据线" }, update={ "$set": { "image": "123" } })
+    print(results)
+
+class Version(TypedDict):
+    desc: str
+    create_time: datetime
+    version: str
+
+
+def backup_document():
+    back_collection = database['backup']
+    product = test_get_product()
+    version = Version(
+       desc="完成 MongoDB 数据库操作的测试和备份",
+       create_time=datetime.now(), 
+    )
+    product['version'] = version
+    back_collection.insert_one(product)
+def main():
+    # create_collection()
+    # show_collection()
+    # insert_object()
+    # find_one()
+    update_one()
+    # backup_document()
+
+if __name__ == "__main__":
+    main()
+
+

+ 113 - 0
tests/mytest/t_odm_beanie.py

@@ -0,0 +1,113 @@
+from datetime import datetime
+from pydantic import BaseModel
+from typing import List, Dict, Optional,TypedDict
+from pymongo import MongoClient
+import asyncio
+from typing import Optional
+
+from motor.motor_asyncio import AsyncIOMotorClient,AsyncIOMotorDatabase
+from pydantic import BaseModel
+
+from beanie import Document, Indexed, init_beanie
+from beanie.odm.operators.update.general import Set
+
+class Variant(BaseModel):
+    name: Optional[str] = None
+    price: Optional[float] = None
+    description: Optional[str] = None
+    category: Optional[str] = None
+
+class Product(Document):
+    name: Optional[str] = None
+    price: Optional[float]
+    # description: str
+    description: Optional[str] = None
+    image: Optional[str] = None
+    brand: Optional[str] = None
+    category: Optional[str] = None
+    variant: Optional[List[Variant]] = None
+
+def test_get_product():
+    product = Product(
+        name="数据线",
+        price=100,
+        description="产品描述", 
+        image='123123',
+        variant=[
+           Variant(
+                name="数据线",
+                price=100,
+                description="产品描述",
+                category="电子产品"
+            ), 
+        ]
+
+    )
+    return product
+
+async def insert_object():
+    product = test_get_product()
+    await product.insert()
+
+async def find_document():
+    '''
+    # 按条件查询
+    product:Product  = await Product.find(Product.name == '数据线').to_list()
+    # 查找一个
+    product:Product  = await Product.find_one(Product.name == '数据线')
+    '''
+
+    product:Product  = await Product.all().to_list()
+    print(product)
+
+async def update_one():
+    '''
+    # https://beanie-odm.dev/api-documentation/operators/update/#set
+    from beanie.odm.operators.update.general import Set
+    # 按条件更新
+    bar = await Product.find(Product.name == '数据线').update(Set({Product.price: 1}))
+    # 用字典的方式更新也可以
+    bar = await Product.find(Product.name == '数据线').update({"$set": {Product.price: 2}})
+    # 所有文档,只要符合条件 Product.price > .5 都更新
+    bar = await Product.find(Product.price > .5).update(Set({Product.price: 1}))
+    # 更新一个
+    bar = await Product.find_one(Product.name == '数据线').update(Set({Product.price: 1}))
+    '''
+    product:Product  = await Product.find_one(Product.name == '数据线')
+    bar = await product.update(Set({Product.price: 300}))
+    bar.variant.append(Variant(category='电子', description='3M 红色 包裹', name='数据线', price=20))
+    # variant = bar.variant[0] if len(bar.variant) > 0 else None
+    # print(variant)
+    await product.update({'$set': {Product.variant: bar.variant}})
+
+async def backup_data(
+    source_model: Document,  # 源模型(如 Product)
+    target_db: AsyncIOMotorDatabase,  # 目标数据库(如 backup_db)
+    target_collection_name: Optional[str] = None  # 可自定义目标集合名
+):
+    # 动态获取集合名称(默认使用模型的集合名)
+    collection_name = target_collection_name or source_model._document_settings.name
+
+    # 查询源数据
+    docs = await source_model.all().to_list()
+
+    if docs:
+        # 转换为字典列表(保留 _id)
+        data = [{"_id": doc.id, **doc.model_dump()} for doc in docs]
+        
+        # 插入到目标集合
+        await target_db[collection_name].insert_many(data)
+
+async def example():
+    # Beanie uses Motor async client under the hood 
+    client = AsyncIOMotorClient("mongodb://sv-v2:27017")
+    # 不需要传参数据库名,直接通过 client.test 属性名的方式就能连接或创建 test 数据库
+    # 或者通过 client["test"] 的方式连接或创建 test 数据库
+    await init_beanie(database=client["test"], document_models=[Product])
+    # await insert_object()
+    # await update_one()
+    await backup_data(Product, client["backup_test"])
+
+
+if __name__ == "__main__":
+    asyncio.run(example())

+ 62 - 0
tests/mytest/t_odm_mongo_engine.py

@@ -0,0 +1,62 @@
+# https://github.com/MongoEngine/mongoengine
+'''
+真难用,完全不兼容 pydantic ,并且没有方便的 dict 转换,而且搜索语句不是人类友好的方式,是作者自己定义的操作语句,完全倒反天罡。
+'''
+from datetime import datetime
+from pydantic import BaseModel
+from typing import List, Dict, Optional,TypedDict
+from pymongo import MongoClient
+from mongoengine import *
+
+# test 数据库,没有则创建
+connect('test', host='sv-v2', port=27017)
+
+class Variant(EmbeddedDocument):
+    name = StringField()
+    price = FloatField()
+    description = StringField()
+    category = StringField()
+
+# 集合名称为 product
+class Product(Document):
+    name = StringField()
+    price = FloatField()
+    description = StringField()
+    image = StringField()
+    brand = StringField()
+    category = StringField()
+    variant = ListField(EmbeddedDocumentField(Variant))
+
+
+
+def test_get_product():
+    product = Product(
+        name="数据线2",
+        price=100,
+        description="产品描述", 
+        image='123123',
+        variant=[
+           Variant(
+                name="3M,蓝色",
+                price=100,
+                description="蓝色数据线",
+            ), 
+        ]
+
+    )
+    return product
+def insert_object():
+    product = test_get_product()
+    product.save()
+
+def update_one():
+    product:Product  = Product.objects(name='数据线2').first()
+    print(product.to_json())
+    print(product.__weakref__)
+
+def main():
+    # insert_object()
+    update_one()
+
+if __name__ == "__main__":
+    main()