博主工作过程中接触到物联网,涉及modbus,mqtt等协议,想着python可以用来读取解析消息内容,实施过程中现场环境存在配置问题,那就开发一个客户端来帮助定位问题
客户端模块是 wxpython,协议对接用了 pymodbus 和 paho-mqtt,打包则使用pyinstaller
代码如下,实现了modbus的读取寄存器功能,后续待补充
1 import wx 2 import json 3 import time 4 import paho.mqtt.client as mqtt 5 from datetime import datetime 6 from pymodbus.client.sync import ModbusTcpClient 7 from pymodbus.exceptions import ConnectionException 8 9 10 class RunClient(wx.Frame): 11 def __init__(self, *args, **kw): 12 super(RunClient, self).__init__(*args, **kw) 13 self.app = wx.App() 14 self.title = "检测工具" 15 16 self.mqtt_data_format = "({time})-({topic})-({data})" 17 self.mqtt_data = [] 18 19 self.message_caption = "Info" 20 21 self.ID_MENU_CLOSE = 99 22 self.ID_MENU_RUN = 1 23 24 self.ID_PANEL_MESSAGE_TYPE = 101 25 self.ID_PANEL_MODBUS_TYPE = 102 26 self.ID_PANEL_MODBUS_SLAVE = 103 27 self.ID_PANEL_MODBUS_FUNCTION = 104 28 self.ID_PANEL_MODBUS_HOST = 105 29 self.ID_PANEL_MODBUS_ADDRESS = 106 30 self.ID_PANEL_MODBUS_PORT = 107 31 self.ID_PANEL_MODBUS_QUANTITY = 108 32 self.ID_PANEL_MODBUS_INFO = 109 33 34 self.check_msg_format = "{} 必须数字" 35 36 self.init_ui() 37 38 def init_ui(self): 39 """初始化ui""" 40 self.init_menu_bar() 41 self.init_panel() 42 43 self.CreateStatusBar() 44 45 self.SetSize((500, 600)) 46 self.SetTitle(self.title) 47 self.Centre() 48 49 @property 50 def message_type(self): 51 """定义消息类型""" 52 return [ 53 "Modbus Client", 54 "MQTT Client" 55 ] 56 57 @property 58 def modbus_functions(self): 59 """定义modbus方法""" 60 return [ 61 "03 Read Holding Registers" 62 ] 63 64 @property 65 def modbus_type(self): 66 """定义modbus类型""" 67 return [ 68 "Modbus Tcp" 69 ] 70 71 def init_menu_bar(self): 72 """初始化菜单按钮""" 73 menu_bar = wx.MenuBar() 74 75 # 定义菜单 76 menu = wx.Menu() 77 menu.Append(self.ID_MENU_RUN, "运行") 78 menu.Append(self.ID_MENU_CLOSE, "关闭") 79 80 menu_bar.Append(menu, "&开始") 81 self.SetMenuBar(menu_bar) 82 83 # 函数绑定菜单ID 84 self.Bind(wx.EVT_MENU, self.run_info, id=self.ID_MENU_RUN) 85 self.Bind(wx.EVT_MENU, self.button_close, id=self.ID_MENU_CLOSE) 86 87 def init_panel(self): 88 """初始panel""" 89 panel = wx.Panel(self) 90 91 # 10+60=70 92 wx.StaticBox(panel, label="协议类型", pos=(10, 10), size=(450, 60)) 93 wx.ComboBox(panel, pos=(20, 30), choices=self.message_type, size=(200, -1), id=self.ID_PANEL_MESSAGE_TYPE, value=self.message_type[0]) 94 95 # 75+60=135 96 wx.StaticBox(panel, label="Modbus来源类型", pos=(10, 75), size=(450, 60)) 97 wx.ComboBox(panel, pos=(20, 95), choices=self.modbus_type, size=(200, -1), id=self.ID_PANEL_MODBUS_TYPE, value=self.modbus_type[0]) 98 99 # 140+240=380 100 wx.StaticBox(panel, label="Modbus读取参数", pos=(10, 140), size=(450, 240)) 101 wx.StaticText(panel, label="slave:", pos=(20, 160)) 102 wx.SpinCtrl(panel, pos=(242, 160), size=(200, -1), min=1, max=255, id=self.ID_PANEL_MODBUS_SLAVE, value="1") 103 104 wx.StaticText(panel, label="function:", pos=(20, 190)) 105 wx.ComboBox(panel, pos=(240, 190), choices=self.modbus_functions, size=(200, -1), id=self.ID_PANEL_MODBUS_FUNCTION, value=self.modbus_functions[0]) 106 107 wx.StaticText(panel, label="address:", pos=(20, 220)) 108 wx.TextCtrl(panel, pos=(240, 220), size=(200, -1), id=self.ID_PANEL_MODBUS_ADDRESS, value="0") 109 110 wx.StaticText(panel, label="quantity:", pos=(20, 250)) 111 wx.TextCtrl(panel, pos=(240, 250), size=(200, -1), id=self.ID_PANEL_MODBUS_QUANTITY, value="10") 112 113 wx.StaticText(panel, label="host:", pos=(20, 280)) 114 wx.TextCtrl(panel, pos=(240, 280), size=(200, -1), id=self.ID_PANEL_MODBUS_HOST, value="127.0.0.1") 115 116 wx.StaticText(panel, label="port:", pos=(20, 310)) 117 wx.TextCtrl(panel, pos=(240, 310), size=(200, -1), id=self.ID_PANEL_MODBUS_PORT, value="512") 118 119 # 385+120=505 120 wx.StaticBox(panel, label="Modbus返回信息", pos=(10, 385), size=(450, 120)) 121 wx.TextCtrl(panel, pos=(20, 405), size=(430, 90), id=self.ID_PANEL_MODBUS_INFO, style=wx.TE_MULTILINE, value="") 122 123 def check_field_int(self, name, value): 124 """检查字段INT""" 125 try: 126 value = int(value) 127 return { 128 "status": True, 129 "value": value 130 } 131 except ValueError: 132 return { 133 "status": False, 134 "msg": self.check_msg_format.format(name) 135 } 136 137 def mqtt_on_message(self, client, userdata, msg): 138 """mqtt消息订阅""" 139 data = json.loads(msg.payload) 140 self.mqtt_data.append(self.mqtt_data_format.format( 141 time=datetime.strftime(datetime.now(), "%d-%m-%Y %H:%M:%S"), 142 topic=msg.topic, 143 data=data 144 )) 145 146 def run_modbus_tcp(self, slave, address, quantity, host, port, function, timeout=3): 147 """运行modbus tcp""" 148 master = ModbusTcpClient(host=host, port=port, timeout=timeout) 149 150 # 校验参数是否int 151 resp = self.check_field_int(name="slave", value=slave) 152 if not resp["status"]: 153 return resp["msg"] 154 resp = self.check_field_int(name="address", value=address) 155 if not resp["status"]: 156 return resp["msg"] 157 resp = self.check_field_int(name="quantity", value=quantity) 158 if not resp["status"]: 159 return resp["msg"] 160 161 results = [] 162 try: 163 # 只写了一种情况 164 if function == self.modbus_functions[0]: 165 result = master.read_holding_registers(address=int(quantity), count=int(quantity), unit=int(slave)) 166 results = result.registers 167 return results 168 return results 169 except AttributeError as e: 170 return e 171 except ConnectionException as e: 172 return e 173 174 def run_mqtt(self, topic, port): 175 """运行mqtt""" 176 client = mqtt.Client() 177 client.connect(host=topic, port=port, keepalive=60) 178 client.subscribe(topic) 179 client.on_message = self.mqtt_on_message 180 181 client.loop_start() 182 time.sleep(20) 183 client.loop_stop() 184 185 def run_info(self, e): 186 """运行""" 187 modbus_type = self.FindWindowById(id=self.ID_PANEL_MODBUS_TYPE).GetValue() 188 modbus_slave = self.FindWindowById(id=self.ID_PANEL_MODBUS_SLAVE).GetValue() 189 modbus_function = self.FindWindowById(id=self.ID_PANEL_MODBUS_FUNCTION).GetValue() 190 modbus_address = self.FindWindowById(id=self.ID_PANEL_MODBUS_ADDRESS).GetValue() 191 modbus_quantity = self.FindWindowById(id=self.ID_PANEL_MODBUS_QUANTITY).GetValue() 192 modbus_host = self.FindWindowById(id=self.ID_PANEL_MODBUS_HOST).GetValue() 193 modbus_port = self.FindWindowById(id=self.ID_PANEL_MODBUS_PORT).GetValue() 194 modbus_info = self.FindWindowById(id=self.ID_PANEL_MODBUS_INFO) 195 196 results = self.run_modbus_tcp( 197 slave=modbus_slave, address=modbus_address, quantity=modbus_quantity, 198 host=modbus_host, port=modbus_port, function=modbus_function 199 ) 200 201 modbus_info.SetValue(str(results)) 202 203 self.show_message() 204 205 def show_message(self): 206 """显示message""" 207 wx.MessageBox(message="运行结束", caption=self.message_caption, style=wx.OK | wx.ICON_INFORMATION) 208 209 def button_close(self, e): 210 """按钮-关闭""" 211 self.Close(True) 212 213 214 # 主函数 215 def main(): 216 217 app = wx.App() 218 run = RunClient(None) 219 run.Show() 220 app.MainLoop() 221 222 223 if __name__ == '__main__': 224 main()
运行效果:
打开的效果
运行后
如果需要打包则
1 pyinstall -F -w xx.py
具体参数可以自行查阅相应文档
打包后文件目录如下
在dist下有相应的 exe文件