zoukankan      html  css  js  c++  java
  • mqtt异步publish方法

    Python基于mqtt异步编程主要用到asyncio及第三方库hbmqtt,这里主要介绍mqtt的异步发布及遇到的一些问题。

    hbmqtt安装很简单,pip hbmqtt install.

    mqtt服务器我使用的是mosquitto.

    1主进程

    主进程执行下面语句就实现了协程

    1 loop = asyncio.get_event_loop()
    2 loop.run_until_complete(run())

    首先是connect,然后publish,整个过程是一个协程

    1 run():               #协程主函数
    2     await connect()
    3     while True:
    4         try:
    5             await publish()
    6         except Exception as ce:
    7             logger.error("Sender Error: %s" % ce)

    2Connect

    这里connect没有使用自动重连机制connect(),而是单独开一个协程执行掉线后自动重连_auto_reconnect

    重连函数,每隔1s执行一次:

    1 async def _auto_reconnect(client):    
    2     while True:
    3         if not client.session.transitions.is_connected():   #如果已经连接上则不执行重连
    4             try:
    5                 await client.reconnect()
    6             except ConnectException:
    7                 pass
    8         await asyncio.sleep(1)

    连接函数,仅执行一次,并启动重连函数协程

    1 async def connect():    
    2     client = MQTTClient(config={"auto_reconnect": False})       #False时关闭自动重连
    3     try:
    4         await client.connect(url)
    5     except ConnectException:
    6         pass
    7     asyncio.ensure_future(_auto_reconnect(client))

    3Publish:

    发布函数比较简单,主要就是

    1  await client.publish(topic, msg)

    4、总结

    这里重点讲为什么协程时不要自动重连,因为如果publish过程中出现断线,需要等待连接成功的event,如果允许自动重连"auto_reconnect": True,程序在publish程序等待信号不退出,无法进入connect程序执行reconnect,这样就永远等不到信号,造成程序死等,类似死机。

    如果不允许自动重连,单开一个协程执行重连操作,即使publish协程等待事件,重连协程会使这个事件响应,这样就可以继续发布。

    在hbmqtt库自带例子中多是先connect,然后publish,然后disconnect,以此循环,但主要考虑到连接后不主动断开一提高程序效率,故没有断开操作。

    这个问题的解决方式可能有点牵强,园友们有没有遇到过类似的问题,在publish过程中关闭mqtt服务器,再重新打开服务器,发布任务能够继续正常执行,如果有好的解决方式,还望不吝赐教。

     

  • 相关阅读:
    分布式跑批的实现流程
    Java1.8新特性Stream流处理集合元素
    RocketMQ消息存储和查询原理(引用清幽之地 清幽之地的博客)
    Markdown使用持续更新
    一次百万请求mq堆积的生产排查
    Spring Boot打包部署修改jar文件名
    Spring Boot的应用启动器
    Spring Boot配置文件application.properties
    Eclipse中出现.classPath(拒绝访问)
    PHP文件上传的相关配置
  • 原文地址:https://www.cnblogs.com/xiating/p/8203615.html
Copyright © 2011-2022 走看看