zoukankan      html  css  js  c++  java
  • python实现读取modbus485协议并打包exe客户端(wxpython+pyinstaller模块)

    博主工作过程中接触到物联网,涉及modbus,mqtt等协议,想着python可以用来读取解析消息内容,实施过程中现场环境存在配置问题,那就开发一个客户端来帮助定位问题

    客户端模块是 wxpython,协议对接用了 pymodbus 和 paho-mqtt,打包则使用pyinstaller

    代码如下,实现了modbus的读取寄存器功能,后续待补充

      1 import wx
      2 import json
      3 import time
      4 import paho.mqtt.client as mqtt
      5 from datetime import datetime
      6 from pymodbus.client.sync import ModbusTcpClient
      7 from pymodbus.exceptions import ConnectionException
      8 
      9 
     10 class RunClient(wx.Frame):
     11     def __init__(self, *args, **kw):
     12         super(RunClient, self).__init__(*args, **kw)
     13         self.app = wx.App()
     14         self.title = "检测工具"
     15 
     16         self.mqtt_data_format = "({time})-({topic})-({data})"
     17         self.mqtt_data = []
     18 
     19         self.message_caption = "Info"
     20 
     21         self.ID_MENU_CLOSE = 99
     22         self.ID_MENU_RUN = 1
     23 
     24         self.ID_PANEL_MESSAGE_TYPE = 101
     25         self.ID_PANEL_MODBUS_TYPE = 102
     26         self.ID_PANEL_MODBUS_SLAVE = 103
     27         self.ID_PANEL_MODBUS_FUNCTION = 104
     28         self.ID_PANEL_MODBUS_HOST = 105
     29         self.ID_PANEL_MODBUS_ADDRESS = 106
     30         self.ID_PANEL_MODBUS_PORT = 107
     31         self.ID_PANEL_MODBUS_QUANTITY = 108
     32         self.ID_PANEL_MODBUS_INFO = 109
     33 
     34         self.check_msg_format = "{} 必须数字"
     35 
     36         self.init_ui()
     37 
     38     def init_ui(self):
     39         """初始化ui"""
     40         self.init_menu_bar()
     41         self.init_panel()
     42 
     43         self.CreateStatusBar()
     44 
     45         self.SetSize((500, 600))
     46         self.SetTitle(self.title)
     47         self.Centre()
     48 
     49     @property
     50     def message_type(self):
     51         """定义消息类型"""
     52         return [
     53             "Modbus Client",
     54             "MQTT Client"
     55         ]
     56 
     57     @property
     58     def modbus_functions(self):
     59         """定义modbus方法"""
     60         return [
     61             "03 Read Holding Registers"
     62         ]
     63 
     64     @property
     65     def modbus_type(self):
     66         """定义modbus类型"""
     67         return [
     68             "Modbus Tcp"
     69         ]
     70 
     71     def init_menu_bar(self):
     72         """初始化菜单按钮"""
     73         menu_bar = wx.MenuBar()
     74 
     75         # 定义菜单
     76         menu = wx.Menu()
     77         menu.Append(self.ID_MENU_RUN, "运行")
     78         menu.Append(self.ID_MENU_CLOSE, "关闭")
     79 
     80         menu_bar.Append(menu, "&开始")
     81         self.SetMenuBar(menu_bar)
     82 
     83         # 函数绑定菜单ID
     84         self.Bind(wx.EVT_MENU, self.run_info, id=self.ID_MENU_RUN)
     85         self.Bind(wx.EVT_MENU, self.button_close, id=self.ID_MENU_CLOSE)
     86 
     87     def init_panel(self):
     88         """初始panel"""
     89         panel = wx.Panel(self)
     90 
     91         # 10+60=70
     92         wx.StaticBox(panel, label="协议类型", pos=(10, 10), size=(450, 60))
     93         wx.ComboBox(panel, pos=(20, 30), choices=self.message_type, size=(200, -1), id=self.ID_PANEL_MESSAGE_TYPE, value=self.message_type[0])
     94 
     95         # 75+60=135
     96         wx.StaticBox(panel, label="Modbus来源类型", pos=(10, 75), size=(450, 60))
     97         wx.ComboBox(panel, pos=(20, 95), choices=self.modbus_type, size=(200, -1), id=self.ID_PANEL_MODBUS_TYPE, value=self.modbus_type[0])
     98 
     99         # 140+240=380
    100         wx.StaticBox(panel, label="Modbus读取参数", pos=(10, 140), size=(450, 240))
    101         wx.StaticText(panel, label="slave:", pos=(20, 160))
    102         wx.SpinCtrl(panel, pos=(242, 160), size=(200, -1), min=1, max=255, id=self.ID_PANEL_MODBUS_SLAVE, value="1")
    103 
    104         wx.StaticText(panel, label="function:", pos=(20, 190))
    105         wx.ComboBox(panel, pos=(240, 190), choices=self.modbus_functions, size=(200, -1), id=self.ID_PANEL_MODBUS_FUNCTION, value=self.modbus_functions[0])
    106 
    107         wx.StaticText(panel, label="address:", pos=(20, 220))
    108         wx.TextCtrl(panel, pos=(240, 220), size=(200, -1), id=self.ID_PANEL_MODBUS_ADDRESS, value="0")
    109 
    110         wx.StaticText(panel, label="quantity:", pos=(20, 250))
    111         wx.TextCtrl(panel, pos=(240, 250), size=(200, -1), id=self.ID_PANEL_MODBUS_QUANTITY, value="10")
    112 
    113         wx.StaticText(panel, label="host:", pos=(20, 280))
    114         wx.TextCtrl(panel, pos=(240, 280), size=(200, -1), id=self.ID_PANEL_MODBUS_HOST, value="127.0.0.1")
    115 
    116         wx.StaticText(panel, label="port:", pos=(20, 310))
    117         wx.TextCtrl(panel, pos=(240, 310), size=(200, -1), id=self.ID_PANEL_MODBUS_PORT, value="512")
    118 
    119         # 385+120=505
    120         wx.StaticBox(panel, label="Modbus返回信息", pos=(10, 385), size=(450, 120))
    121         wx.TextCtrl(panel, pos=(20, 405), size=(430, 90), id=self.ID_PANEL_MODBUS_INFO, style=wx.TE_MULTILINE, value="")
    122 
    123     def check_field_int(self, name, value):
    124         """检查字段INT"""
    125         try:
    126             value = int(value)
    127             return {
    128                 "status": True,
    129                 "value": value
    130             }
    131         except ValueError:
    132             return {
    133                 "status": False,
    134                 "msg": self.check_msg_format.format(name)
    135             }
    136 
    137     def mqtt_on_message(self, client, userdata, msg):
    138         """mqtt消息订阅"""
    139         data = json.loads(msg.payload)
    140         self.mqtt_data.append(self.mqtt_data_format.format(
    141             time=datetime.strftime(datetime.now(), "%d-%m-%Y %H:%M:%S"),
    142             topic=msg.topic,
    143             data=data
    144         ))
    145 
    146     def run_modbus_tcp(self, slave, address, quantity, host, port, function, timeout=3):
    147         """运行modbus tcp"""
    148         master = ModbusTcpClient(host=host, port=port, timeout=timeout)
    149 
    150         # 校验参数是否int
    151         resp = self.check_field_int(name="slave", value=slave)
    152         if not resp["status"]:
    153             return resp["msg"]
    154         resp = self.check_field_int(name="address", value=address)
    155         if not resp["status"]:
    156             return resp["msg"]
    157         resp = self.check_field_int(name="quantity", value=quantity)
    158         if not resp["status"]:
    159             return resp["msg"]
    160 
    161         results = []
    162         try:
    163             # 只写了一种情况
    164             if function == self.modbus_functions[0]:
    165                 result = master.read_holding_registers(address=int(quantity), count=int(quantity), unit=int(slave))
    166                 results = result.registers
    167                 return results
    168             return results
    169         except AttributeError as e:
    170             return e
    171         except ConnectionException as e:
    172             return e
    173 
    174     def run_mqtt(self, topic, port):
    175         """运行mqtt"""
    176         client = mqtt.Client()
    177         client.connect(host=topic, port=port, keepalive=60)
    178         client.subscribe(topic)
    179         client.on_message = self.mqtt_on_message
    180 
    181         client.loop_start()
    182         time.sleep(20)
    183         client.loop_stop()
    184 
    185     def run_info(self, e):
    186         """运行"""
    187         modbus_type = self.FindWindowById(id=self.ID_PANEL_MODBUS_TYPE).GetValue()
    188         modbus_slave = self.FindWindowById(id=self.ID_PANEL_MODBUS_SLAVE).GetValue()
    189         modbus_function = self.FindWindowById(id=self.ID_PANEL_MODBUS_FUNCTION).GetValue()
    190         modbus_address = self.FindWindowById(id=self.ID_PANEL_MODBUS_ADDRESS).GetValue()
    191         modbus_quantity = self.FindWindowById(id=self.ID_PANEL_MODBUS_QUANTITY).GetValue()
    192         modbus_host = self.FindWindowById(id=self.ID_PANEL_MODBUS_HOST).GetValue()
    193         modbus_port = self.FindWindowById(id=self.ID_PANEL_MODBUS_PORT).GetValue()
    194         modbus_info = self.FindWindowById(id=self.ID_PANEL_MODBUS_INFO)
    195 
    196         results = self.run_modbus_tcp(
    197             slave=modbus_slave, address=modbus_address, quantity=modbus_quantity,
    198             host=modbus_host, port=modbus_port, function=modbus_function
    199         )
    200 
    201         modbus_info.SetValue(str(results))
    202 
    203         self.show_message()
    204 
    205     def show_message(self):
    206         """显示message"""
    207         wx.MessageBox(message="运行结束", caption=self.message_caption, style=wx.OK | wx.ICON_INFORMATION)
    208 
    209     def button_close(self, e):
    210         """按钮-关闭"""
    211         self.Close(True)
    212 
    213 
    214 # 主函数
    215 def main():
    216 
    217     app = wx.App()
    218     run = RunClient(None)
    219     run.Show()
    220     app.MainLoop()
    221 
    222 
    223 if __name__ == '__main__':
    224     main()

    运行效果:

    打开的效果

    运行后

     

    如果需要打包则

    1 pyinstall -F -w xx.py

    具体参数可以自行查阅相应文档

    打包后文件目录如下

     在dist下有相应的 exe文件

  • 相关阅读:
    使用logstash迁移ES1.x数据到ES6.x
    Kafka版本升级
    linux配置Mariadb双主互备
    OS7误删yum
    python中运行js代码 js2py
    python获取js里window对象
    python使用execjs执行js
    .Net Core AddTransient、AddScoped和AddSingleton的使用
    查询SQL Server数据库应用程序访问等待执行的SQL
    Spring Boot 2.4.0 发布说明
  • 原文地址:https://www.cnblogs.com/cllovewxq/p/15379051.html
Copyright © 2011-2022 走看看