zoukankan      html  css  js  c++  java
  • 用python实现与小米网关通讯

    用python实现与小米网关通讯

     

    python 与小米网关通讯的三块内容:

    以下内容的理解需要配合《绿米网关局域网通讯协议》使用

    1、监听网关发出的组播信息:(有网关及连接设备的生命信号,事件信息)

    2、读取需要获得的信息

    3、控制连接设备(涉及了token加密部分)

    1、upd广播监听小米网关的组播信息

    复制代码
     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 
     4 import socket
     5 
     6 def get_gateway_heart():
     7     SENDERIP = "0.0.0.0"
     8     MYPORT = 9898
     9     MYGROUP = '224.0.0.50'
    10 
    11     sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
    12     #allow multiple sockets to use the same PORT number
    13     sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
    14     #Bind to the port that we know will receive multicast data
    15     sock.bind((SENDERIP,MYPORT))
    16     #tell the kernel that we are a multicast socket
    17     #sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 255)
    18     #Tell the kernel that we want to add ourselves to a multicast group
    19     #The address for the multicast group is the third param
    20     status = sock.setsockopt(socket.IPPROTO_IP,
    21         socket.IP_ADD_MEMBERSHIP,
    22         socket.inet_aton(MYGROUP) + socket.inet_aton(SENDERIP));
    23 
    24     #sock.setblocking(0)
    25     #ts = time.time()
    26     data, addr = sock.recvfrom(1024)
    27     data_str=str(data,encoding='utf-8')
    28 #    sock.close()
    29     return data_str
    30 
    31 
    32 if __name__=='__main__':
    33     while True:
    34         tmp=get_gateway_heart()
    35         print(tmp)
    复制代码

    2、小米网关的初始密码向量 转换为字符串 的计算方法

    复制代码
    1 from binascii import b2a_hex, a2b_hex
    2 import sys
    3 
    4 s='17996d093d28ddb3ba695a2e6f58562e'    #初始向量
    5 m=a2b_hex(s)    
    6 print(m)
    7 
    8 #转换后的初始向量
    9 #b'x17x99m	=(xddxb3xbaiZ.oXV.'
    复制代码

    3、加密token的方法

    复制代码
     1 from Crypto.Cipher import AES
     2 from binascii import b2a_hex, a2b_hex
     3 
     4 class prpcrypt():
     5     def __init__(self, key='cy5zmrpqws05vsqj'):
     6         self.key = key              #
     7         self.mode = AES.MODE_CBC
     8 
     9     # 加密函数,如果text不足16位就用空格补足为16位,
    10     # 如果大于16当时不是16的倍数,那就补足为16的倍数。
    11     def encrypt(self, text):        #text是要加密的内容
    12         cryptor = AES.new(self.key, self.mode,b'x17x99m	=(xddxb3xbaiZ.oXV.')
    13         # 这里密钥key 长度必须为16(AES-128),
    14         # 24(AES-192),或者32 (AES-256)Bytes 长度
    15         # 目前AES-128 足够目前使用
    16         length = 16
    17         count = len(text)
    18         if count < length:
    19             add = (length - count)
    20             #  backspace
    21             text = text + ('' * add)
    22         elif count > length:
    23             add = (length - (count % length))
    24             text = text + ('' * add)
    25         self.ciphertext = cryptor.encrypt(text)
    26         # 因为AES加密时候得到的字符串不一定是ascii字符集的,输出到终端或者保存时候可能存在问题
    27         # 所以这里统一把加密后的字符串转化为16进制字符串
    28         return str(b2a_hex(self.ciphertext),encoding='utf-8')
    29         #return self.ciphertext
    30 
    31     # 解密后,去掉补足的空格用strip() 去掉 b'0000000000000000'
    32     def decrypt(self, text):
    33         cryptor = AES.new(self.key, self.mode, b'x17x99m	=(xddxb3xbaiZ.oXV.')
    34         plain_text = cryptor.decrypt(a2b_hex(text))
    35         return plain_text.rstrip('')
    36 
    37 
    38 if __name__ == '__main__':
    39     pc = prpcrypt('0987654321qwerty')  # 初始化密钥
    40     e = pc.encrypt('1234567890abcdef')  # 加密
    41    # d = pc.decrypt(e)  # 解密
    42     print("加密:", e)
    43     #print("解密:", d)
    复制代码

    4、获取token,并对获取的token进行加密

    复制代码
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import socket
    import json
    from xm_gw.encrypty import prpcrypt
    
    def get_token():    #通过get_id_list 获得token
        ip_port_single = ("192.168.31.150", 9898)
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
        comd = {'cmd': 'get_id_list'}
        order = json.dumps(comd)
        s.sendto(bytes(order, encoding="utf-8"), ip_port_single)
        data,addr=s.recvfrom(1024)
        data_str=str(data,encoding='utf-8')
        token=json.loads(data_str).get('token')
        s.close()
        return token
    
    def get_token_encrypty():
        tok = get_token()  # 拿到当前token,要进行加密的内容
        k = prpcrypt()
        key_encrypt = k.encrypt(tok)
        return key_encrypt
    
    if __name__=='__main__':
        tok=get_token()
        tok_encry=get_token_encrypty()
        print(tok)
        print(tok_encry)
    复制代码

    5、建立网关通讯并执行控制命令

    复制代码
     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 import socket
     4 import json
     5 from xm_gw import udp_token_key
     6 
     7 
     8 class udp_gw():
     9     def __init__(self, ip_gateway='192.168.8.100'):
    10         self.ip_port_zu43 = ('224.0.0.50', 4321)
    11         self.ip_port_single = (ip_gateway, 9898)
    12         self.ip_port_zu9898=('224.0.0.50', 9898)
    13 
    14     def whois(self):
    15         s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
    16         comd = {'cmd': 'whois'}
    17         order = json.dumps(comd)
    18         s.sendto(bytes(order, encoding="utf-8"), self.ip_port_zu43)
    19         data_bytes, addr = s.recvfrom(1024)
    20         data_dic = json.loads(str(data_bytes, encoding='utf-8'))
    21         s.close()
    22         return data_dic
    23 
    24     def get_id_list(self):
    25         s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
    26         comd = {'cmd': 'get_id_list'}
    27         order = json.dumps(comd)
    28         s.sendto(bytes(order, encoding="utf-8"), self.ip_port_single)
    29         data_bytes, addr = s.recvfrom(1024)
    30         data_dic = json.loads(str(data_bytes, encoding='utf-8'))
    31         s.close()
    32         return data_dic
    33 
    34     def read_sid(self, sid):
    35         s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
    36         comd = {'cmd': 'read', 'sid': sid}
    37         order = json.dumps(comd)
    38         s.sendto(bytes(order, encoding="utf-8"), self.ip_port_single)
    39         data_bytes, addr = s.recvfrom(1024)
    40         data_dic = json.loads(str(data_bytes, encoding='utf-8'))
    41         s.close()
    42         return data_dic
    43 
    44     def write_plug(self, status):
    45         s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
    46         key_encrypt = udp_token_key.get_token_encrypty()
    47         comd = {"cmd": "write", "model": "plug", "sid": "158d0001b84d9a", "short_id": 46384,
    48                 "data": {"status": status, 'key': key_encrypt}}
    49         order = json.dumps(comd)
    50         s.sendto(bytes(order, encoding="utf-8"), self.ip_port_single)
    51         data_bytes, addr = s.recvfrom(1024)
    52         data_dic = json.loads(str(data_bytes, encoding='utf-8'))
    53         s.close()
    54         return data_dic
    55 
    56     def read_all_sid(self):
    57         s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
    58         ls = json.loads(self.get_id_list().get('data'))
    59         ls_sensor_state = []
    60         for sid in ls:
    61             comd = {'cmd': 'read', 'sid': sid}
    62             order = json.dumps(comd)
    63             s.sendto(bytes(order, encoding="utf-8"), self.ip_port_single)
    64             data_bytes, addr = s.recvfrom(1024)
    65             data_dic = json.loads(str(data_bytes, encoding='utf-8'))
    66             #            print(data_dic)
    67             ls_sensor_state.append(data_dic)
    68         s.close()
    69         return ls_sensor_state
    70 
    71     def get_dict_model_sid(self):
    72         dic_gw=self.whois()
    73         ls=self.read_all_sid()
    74         dic_model_sid = {}
    75         for dic in ls:
    76             model = dic.get('model')
    77             sid = dic.get('sid')
    78             dic_model_sid[model] = sid
    79         dic_model_sid['gateway'] = dic_gw.get('sid')
    80         return dic_model_sid
    81 
    82 
    83 if __name__=='__main__':
    84     import time
    85     #{'plug': '158d0001b84d9a', 'switch': '158d0001c10bd7', 'sensor_ht': '158d0001e87bd9',
    86     # 'magnet': '158d0001bb3daf', 'motion': '158d0001c2f110', 'gateway': '7811dcb38599'}
    87     gw=udp_gw('192.168.31.150')
    88     tmp = gw.read_sid('158d0001b84d9a')
    89 #    print(tmp1)
    90 #    time.sleep(5)
    91 #    gw.write_plug('off')
    92 #    time.sleep(5)
    93 #    tmp=gw.read_sid('158d0001b84d9a')
    94 
    95     print(tmp)
    复制代码
  • 相关阅读:
    常用的 JavaScript 简写方法
    user-select详解
    惊人的CSS和JavaScript动画logos例子
    debounce与throttle区别
    Vue.js中data,props和computed数据
    字体图标出现乱码的兼容方案
    ios调用Html内JS alert 不能点击关闭为甚?
    统一诊断服务 (Unified diagnostic services , UDS) (二)
    统一诊断服务 (Unified diagnostic services , UDS) (一)
    CAN总线同步
  • 原文地址:https://www.cnblogs.com/syq666/p/8743076.html
Copyright © 2011-2022 走看看