OneNet_API.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. import requests
  2. import json
  3. # 因为设备名是唯一的,加上 token 中有产品ID,因此很容易仅通过设备名就可以查询设备信息
  4. DEVICE_NAME = "emq_online"
  5. TOKEN = "version=2018-10-31&res=products%2F522417&et=1685551369&method=sha1&sign=9l9gkFQx7A%2B7B3fu8uU%2F089azps%3D"
  6. # 以产品为对象构建类
  7. class Api_OneNet():
  8. def __init__(self, api_addr="https://api.heclouds.com", token=TOKEN) -> None:
  9. self.api_addr = api_addr
  10. self.token = token
  11. def get_device_info(self, device_name):
  12. url = self.api_addr + f"/mqtt/v1/devices/{device_name}"
  13. headers = {
  14. 'Authorization': self.token,
  15. }
  16. response = requests.request("GET", url, headers=headers)
  17. print(type(response))
  18. device_info = json.dumps(response.json(), indent=2, ensure_ascii=False)
  19. print(device_info)
  20. return device_info
  21. def register_device(self, device_name):
  22. url = self.api_addr + f'/mqtt/v1/devices/reg'
  23. # TODO
  24. def device_cmd(self, device_id):
  25. url = self.api_addr + f'/v1/synccmds?device_id={device_id}&timeout=30'
  26. headers = {
  27. 'Authorization': self.token,
  28. }
  29. response = requests.request("POST", url, headers=headers, data="open")
  30. res = json.dumps(response.json(), indent=2, ensure_ascii=False)
  31. print(res)
  32. return res
  33. if __name__ == '__main__':
  34. api = Api_OneNet()
  35. api.device_cmd('952659214')
  36. '''
  37. <class 'requests.models.Response'>
  38. {
  39. "request_id": "7477d03e-815f-4973-9426-6e88d6699522",
  40. "code": "onenet_common_success",
  41. "code_no": "000000",
  42. "message": null,
  43. "data": {
  44. "device_id": "952659214",
  45. "name": "emq_online",
  46. "pid": 522417,
  47. "key": "bXgU8U0W1DAlyWcD22xrQNK/DXkwr6kFD7bLKQ2390w="
  48. }
  49. }
  50. '''