| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- import asyncio
- from collections.abc import Generator
- import json
- from typing import Any
- from pymongo import MongoClient
- from bson.objectid import ObjectId
- from datetime import datetime
- class FindTool():
- def _invoke(self, tool_parameters: dict[str, Any]):
- client = MongoClient(self.runtime.credentials["uri"],
- username=self.runtime.credentials["username"],
- password=self.runtime.credentials["password"])
- db = client[tool_parameters["database_name"]]
- collection = db[tool_parameters["collection_name"]]
- print((tool_parameters["query"]))
- items = collection.find(eval(tool_parameters["query"]))
- items = list(items)
- df_list = []
- for item in items:
- df_dict = {}
- for key,value in item.items():
- if isinstance(value, ObjectId):
- df_dict[key] = value.binary
- elif isinstance(value, datetime):
- df_dict[key] = str(value)
- elif isinstance(value, (str,int,bool)):
- df_dict[key] = value
- elif isinstance(value, list):
- dl = []
- for v in value:
- if isinstance(v, ObjectId):
- dl.append(v.binary)
- else:
- dl.append(v)
- df_dict[key] = dl
- df_list.append(df_dict)
- mongo_json = {"mongodb": df_list}
- yield self.create_text_message(str(mongo_json))
- async def main():
- url = 'mongodb://sv-v2.lan:27017/amazone?authSource=amazone'
- user_name = 'gpt'
- password = 'gpt123'
- client = MongoClient(url, username=user_name, password=password)
- db = client['amazone']
- collection = db['agent.product']
- query = '[{"$match": {"basic_info.name": "电线保护套"}}, {"$project": {"competitor_crawl_data": 1}}, {"$addFields": {"competitors": {"$objectToArray": "$competitor_crawl_data"}}}, {"$unwind": "$competitors"}, {"$project": {"_id": 0, "asin": "$competitors.k", "product_info": {"main_text": "$competitors.v.extra_result.product_info.main_text"}, "result_table": {"$map": {"input": "$competitors.v.extra_result.result_table", "as": "item", "in": {"traffic_keyword": "$$item.traffic_keyword", "monthly_searches": "$$item.monthly_searches"}}}}}]'
- pipeline = [
- {
- '$project': {
- 'combined': {
- '$concatArrays': [
- {
- '$map': {
- 'input': '$competitor.results',
- 'as': 'result',
- 'in': {
- 'asin': '$$result.asin',
- 'main_key': '$$result.main_key',
- 'monthly_searches': '$$result.monthly_searches',
- 'type': 'result'
- }
- }
- }, {
- '$map': {
- 'input': '$competitor.tail_keys',
- 'as': 'tail',
- 'in': {
- 'tail_key': '$$tail.tail_key',
- 'monthly_searches': '$$tail.monthly_searches',
- 'type': 'tail_key'
- }
- }
- }
- ]
- }
- }
- }, {
- '$unwind': '$combined'
- }, {
- '$replaceRoot': {
- 'newRoot': '$combined'
- }
- }
- ]
- # query = '[]'
- # query = json.loads(query)
- # res = db.aggregate(json.loads(query))
- res = collection.aggregate(pipeline).to_list()
- print(f"{res}")
- return
- tool_parameters = {
- "query": query,
- }
- find_qurey = '{"basic_info.name": "电线保护套"}'
- items = collection.find(eval(find_qurey))
- items = list(items)
- print(items)
- df_list = []
- if __name__ == "__main__":
- asyncio.run(main())
|