t_pymongo_test.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. import asyncio
  2. from collections.abc import Generator
  3. import json
  4. from typing import Any
  5. from pymongo import MongoClient
  6. from bson.objectid import ObjectId
  7. from datetime import datetime
  8. class FindTool():
  9. def _invoke(self, tool_parameters: dict[str, Any]):
  10. client = MongoClient(self.runtime.credentials["uri"],
  11. username=self.runtime.credentials["username"],
  12. password=self.runtime.credentials["password"])
  13. db = client[tool_parameters["database_name"]]
  14. collection = db[tool_parameters["collection_name"]]
  15. print((tool_parameters["query"]))
  16. items = collection.find(eval(tool_parameters["query"]))
  17. items = list(items)
  18. df_list = []
  19. for item in items:
  20. df_dict = {}
  21. for key,value in item.items():
  22. if isinstance(value, ObjectId):
  23. df_dict[key] = value.binary
  24. elif isinstance(value, datetime):
  25. df_dict[key] = str(value)
  26. elif isinstance(value, (str,int,bool)):
  27. df_dict[key] = value
  28. elif isinstance(value, list):
  29. dl = []
  30. for v in value:
  31. if isinstance(v, ObjectId):
  32. dl.append(v.binary)
  33. else:
  34. dl.append(v)
  35. df_dict[key] = dl
  36. df_list.append(df_dict)
  37. mongo_json = {"mongodb": df_list}
  38. yield self.create_text_message(str(mongo_json))
  39. async def main():
  40. url = 'mongodb://sv-v2.lan:27017/amazone?authSource=amazone'
  41. user_name = 'gpt'
  42. password = 'gpt123'
  43. client = MongoClient(url, username=user_name, password=password)
  44. db = client['amazone']
  45. collection = db['agent.product']
  46. query = '[{"$match": {"basic_info.name": "电线保护套"}}, {"$project": {"competitor_crawl_data": 1}}, {"$addFields": {"competitors": {"$objectToArray": "$competitor_crawl_data"}}}, {"$unwind": "$competitors"}, {"$project": {"_id": 0, "asin": "$competitors.k", "product_info": {"main_text": "$competitors.v.extra_result.product_info.main_text"}, "result_table": {"$map": {"input": "$competitors.v.extra_result.result_table", "as": "item", "in": {"traffic_keyword": "$$item.traffic_keyword", "monthly_searches": "$$item.monthly_searches"}}}}}]'
  47. pipeline = [
  48. {
  49. '$project': {
  50. 'combined': {
  51. '$concatArrays': [
  52. {
  53. '$map': {
  54. 'input': '$competitor.results',
  55. 'as': 'result',
  56. 'in': {
  57. 'asin': '$$result.asin',
  58. 'main_key': '$$result.main_key',
  59. 'monthly_searches': '$$result.monthly_searches',
  60. 'type': 'result'
  61. }
  62. }
  63. }, {
  64. '$map': {
  65. 'input': '$competitor.tail_keys',
  66. 'as': 'tail',
  67. 'in': {
  68. 'tail_key': '$$tail.tail_key',
  69. 'monthly_searches': '$$tail.monthly_searches',
  70. 'type': 'tail_key'
  71. }
  72. }
  73. }
  74. ]
  75. }
  76. }
  77. }, {
  78. '$unwind': '$combined'
  79. }, {
  80. '$replaceRoot': {
  81. 'newRoot': '$combined'
  82. }
  83. }
  84. ]
  85. # query = '[]'
  86. # query = json.loads(query)
  87. # res = db.aggregate(json.loads(query))
  88. res = collection.aggregate(pipeline).to_list()
  89. print(f"{res}")
  90. return
  91. tool_parameters = {
  92. "query": query,
  93. }
  94. find_qurey = '{"basic_info.name": "电线保护套"}'
  95. items = collection.find(eval(find_qurey))
  96. items = list(items)
  97. print(items)
  98. df_list = []
  99. if __name__ == "__main__":
  100. asyncio.run(main())