import json from src.models.product_model import Product from src.models.config_model import UserConfig from src.models.ai_execution_record import ( CompetitorKeywordAnalysis, MarketingContentGeneration, LLMConfig, AICompetitorAnalyzeMainKeywordsResult, MarketingInfo,) from src.manager.core.db_mongo import BaseMongoManager import asyncio from utils.logu import get_logger from src.models.field_config import FieldConfig, get_field_descriptions logger = get_logger('test') async def example_usage(): """更新后的示例调用代码""" db_manager = BaseMongoManager(db_name='test') await db_manager.initialize() db = db_manager.db # 使用 beanie 语法 product = await Product.find_one(Product.basic_info.name == "电线保护套") # 使用 pymongo 语法 collection = db['Product'] result = await collection.find_one( { "basic_info.name": "电线保护套" }, { "basic_info": 1, "_id": 0 } # 投影 ) print(result) result = await db.Product.find_one( { "basic_info.name": "电线保护套" }, { "basic_info": 1, "_id": 0 } ) print(result) async def query_demo(): """pymongo 示例调用代码""" db_manager = BaseMongoManager(db_name='test') await db_manager.initialize() db = db_manager.db query_find_one = { "filter": {"basic_info.name": "电线保护套"}, "projection": {"basic_info": 1, "_id": 0} } result = await db.Product.find_one( **query_find_one ) print(result) async def aggregate_demo(): db_manager = BaseMongoManager(db_name='test') await db_manager.initialize() db = db_manager.db pipeline = [ { '$project': { 'competitor_crawl_data': 1 } }, { '$addFields': { 'competitors': { '$objectToArray': '$competitor_crawl_data' } } }, { '$unwind': '$competitors' }, { '$project': { '_id': 0, 'asin': '$competitors.k', 'product_info': { 'main_text': '$competitors.v.extra_result.product_info.main_text' }, 'result_table': { '$map': { 'input': '$competitors.v.extra_result.result_table', 'as': 'item', 'in': { 'traffic_keyword': '$$item.traffic_keyword', 'monthly_searches': '$$item.monthly_searches' } } } } } ] result = await db.Product.aggregate(pipeline).to_list() print(json.dumps(result, indent=4, ensure_ascii=False)) async def filter_aggregate_demo(): db_manager = BaseMongoManager(db_name='test') await db_manager.initialize() db = db_manager.db filter_competior_by_name = [ { '$match': { 'basic_info.name': "{{ product_name }}" } }, { '$project': { 'competitor_crawl_data': 1 } }, { '$addFields': { 'competitors': { '$objectToArray': '$competitor_crawl_data' } } }, { '$unwind': '$competitors' }, { '$project': { '_id': 0, 'asin': '$competitors.k', 'product_info': { 'main_text': '$competitors.v.extra_result.product_info.main_text' }, 'result_table': { '$map': { 'input': '$competitors.v.extra_result.result_table', 'as': 'item', 'in': { 'traffic_keyword': '$$item.traffic_keyword', 'monthly_searches': '$$item.monthly_searches' } } } } } ] result = await db.Product.aggregate(filter_competior_by_name).to_list() print(json.dumps(result, indent=4, ensure_ascii=False)) def main(): asyncio.run(filter_aggregate_demo()) if __name__ == "__main__": main()