| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758 |
- import requests
- import json
- # 因为设备名是唯一的,加上 token 中有产品ID,因此很容易仅通过设备名就可以查询设备信息
- DEVICE_NAME = "emq_online"
- TOKEN = "version=2018-10-31&res=products%2F522417&et=1685551369&method=sha1&sign=9l9gkFQx7A%2B7B3fu8uU%2F089azps%3D"
- # 以产品为对象构建类
- class Api_OneNet():
- def __init__(self, api_addr="https://api.heclouds.com", token=TOKEN) -> None:
- self.api_addr = api_addr
- self.token = token
-
- def get_device_info(self, device_name):
- url = self.api_addr + f"/mqtt/v1/devices/{device_name}"
- headers = {
- 'Authorization': self.token,
- }
- response = requests.request("GET", url, headers=headers)
- print(type(response))
- device_info = json.dumps(response.json(), indent=2, ensure_ascii=False)
- print(device_info)
- return device_info
- def register_device(self, device_name):
- url = self.api_addr + f'/mqtt/v1/devices/reg'
- # TODO
-
- def device_cmd(self, device_id):
- url = self.api_addr + f'/v1/synccmds?device_id={device_id}&timeout=30'
- headers = {
- 'Authorization': self.token,
- }
- response = requests.request("POST", url, headers=headers, data="open")
- res = json.dumps(response.json(), indent=2, ensure_ascii=False)
- print(res)
- return res
-
- if __name__ == '__main__':
- api = Api_OneNet()
- api.device_cmd('952659214')
-
- '''
- <class 'requests.models.Response'>
- {
- "request_id": "7477d03e-815f-4973-9426-6e88d6699522",
- "code": "onenet_common_success",
- "code_no": "000000",
- "message": null,
- "data": {
- "device_id": "952659214",
- "name": "emq_online",
- "pid": 522417,
- "key": "bXgU8U0W1DAlyWcD22xrQNK/DXkwr6kFD7bLKQ2390w="
- }
- }
- '''
|