zoukankan      html  css  js  c++  java
  • python消息队列snakemq使用总结

    Python 消息队列snakemq总结

    最近学习消息总线zeromq,在网上搜了python实现的消息总线模块,意外发现有个消息队列snakemq,于是拿来研究一下,感觉还是很不错的,入手简单使用也简单(比ice强多了),就是资料太少了,只能自己抠。

    一、关于snakemq的官方介绍

    1纯python实现,跨平台

    2自动重连接

    3可靠发送--可配置的消息方式与消息超时方式

    4持久化/临时 两种队列

    5支持异步 -- poll()

    6symmetrical -- 单个TCP连接可用于双工通讯

    7多数据库支持 -- SQLite、MongoDB……

    8brokerless - 类似ZeroMQ的实现原理

    9扩展模块:RPC, bandwidth throttling

    以上都是官话,需要自己验证,动手封装了一下,感觉萌萌哒。

    二、几个主要问题说明

    1支持自动重连,不需要自己动手写心跳逻辑,你只需要关注发送和接收就行

    2支持数据持久化,如果开始持久化,在重连之后会自动发送数据。

    3数据的接收,snakemq通过提供回调实现,你只需要写个接收方法添加到回调列表里去。

    4数据的发送,在此发送的都是bytes类型(二进制),因此需要转换。我在程序中测试的都是文本字符串,使用str.encode(‘utf-8’)转换成bytes,接收时再转换回来。

    5术语解释,Connector:类似于socket的TcpClient,Lisenter:类似于socket的TcpServer,每个connector或者listener都一个一个ident标识,发送和接收数据时就知道是谁的数据了。

    6使用sqlite持久化时,需要修改源码,sqlite3.connect(filename,check_same_thread = False),用于解决多线程访问sqlite的问题。(会不会死锁?)

    7启动持久化时,如果重新连上,则会自动发送,保证可靠。
    8
    为了封装的需要,数据接收以后,我通过callback方式传送出去。

     

    三、代码

    说明代码中使用了自定义的日志模块

    from common import nxlogger

    import snakemqlogger as logger

    可替换成logging的。

     

    回调类(callbacks.py):

     

    # -*- coding:utf-8 -*-

    #*********************************************************#

    # @@FileName: callbacks.py

    # @@Author: Shelwinnee<Shelwinnee@163.com>

    # @@Create Date: 2016-04-14 22:24:14

    # @@Modify Date: 2016-04-14 22:24:34

    # @@Description: simple synchronized callbacks helper

    #*********************************************************#

     

    '''synchronized callback'''

    class Callback(object):

        def __init__(self):

            self.callbacks = []

     

        def add(self, func):

            self.callbacks.append(func)

     

        def remove(self, func):

            self.callbacks.remove(func)

     

        def __call__(self, *args, **kwargs):

            for callback in self.callbacks:

                callback(*args, **kwargs)

     

    Connector类(snakemqConnector.py):

    # -*- coding:utf-8 -*-

    #*********************************************************#

    # @@FileName: snakemqConnector.py

    # @@Author: Shelwinnee<Shelwinnee@163.com>

    # @@Create Date: 2016-04-14 22:24:14

    # @@Modify Date: 2016-04-14 22:24:34

    # @@Description: 

    # note:if message persistent is necessary, please make sure you know the following instruction meaning,

    # 'sqlite object created in a thread can only be used in that same thread'

    # this problem's solution: modify the connection method of sqlite3, open file ('snakemqstoragesqlite.py') ,and modify the following code:

    # self.conn = sqlite3.connect(filename,check_same_thread = False)

    #*********************************************************#

    import threading

    import snakemq

    import snakemq.link

    import snakemq.packeter

    import snakemq.messaging

    import snakemq.message

    from snakemq.storage.sqlite import SqliteQueuesStorage

    from snakemq.message import FLAG_PERSISTENT

    from common.callbacks import Callback

     

    from common import nxlogger

    import snakemqlogger as logger

     

    class SnakemqConnector(threading.Thread):

             def __init__(self, snakemqident = None, remoteIp = "localhost", remotePort = 9090, persistent = False):

                      super(SnakemqConnector,self).__init__()

                      self.messaging = None

                      self.link = None

                      self.snakemqident = snakemqident

                      self.pktr = None

                      self.remoteIp = remoteIp

                      self.remotePort = remotePort

                      self.persistent = persistent

                      self.on_recv = Callback()

                      self._initConnector()

     

             def run(self):

                      logger.info("connector start...")

                     

                      if self.link != None:

                               self.link.loop()

     

                      logger.info("connector end...")

            

             def terminate(self):

                      logger.info("connetor terminating...")

                      if self.link != None:

                               self.link.stop()

                               self.link.cleanup()

                      logger.info("connetor terminated")

     

             def on_recv_message(self, conn, ident, message):

                      try:

                               self.on_recv(ident, message.data.decode('utf-8'))#dispatch received data

                      except Exception as e:

                               logger.error("connector recv:{0}".format(e))

                               print(e)

     

             '''send message to dest host named destIdent'''

             def sendMsg(self, destIdent, byteseq):

                      msg = None

                      if self.persistent:

                               msg = snakemq.message.Message(byteseq, ttl=60, flags=FLAG_PERSISTENT)

                      else:

                               msg = snakemq.message.Message(byteseq, ttl=60)

                      if self.messaging == None:

                               logger.error("connector:messaging is not initialized, send message failed")

                               return

                      self.messaging.send_message(destIdent, msg)

     

             '''

            

             '''

             def _initConnector(self):

                      try:

                               self.link = snakemq.link.Link()

                               self.link.add_connector((self.remoteIp, self.remotePort))

     

                               self.pktr = snakemq.packeter.Packeter(self.link)

     

                               if self.persistent:

                                        storage = SqliteQueuesStorage("SnakemqStorage.db")

                                        self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr, storage)

                               else:

                                        self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr)

                              

                               self.messaging.on_message_recv.add(self.on_recv_message)

                              

                      except Exception as e:

                               logger.error("connector:{0}".format(e))

                      finally:

                               logger.info("connector[{0}] loop ended...".format(self.snakemqident))

     

    Listener类(snakemqListener.py):

    # -*- coding:utf-8 -*-

    #*********************************************************#

    # @@FileName: snakemqListener.py

    # @@Author: Shelwinnee<Shelwinnee@163.com>

    # @@Create Date: 2016-04-14 22:24:14

    # @@Modify Date: 2016-04-14 22:24:34

    # @@Description: 

    # note:if message persistent is necessary, please make sure you know the following instruction meaning,

    # 'sqlite object created in a thread can only be used in that same thread'

    # this problem's solution: modify the connection method of sqlite3, open file ('snakemqstoragesqlite.py') ,and modify the following code:

    # self.conn = sqlite3.connect(filename,check_same_thread = False)

    #*********************************************************#

    import threading

    import snakemq

    import snakemq.link

    import snakemq.packeter

    import snakemq.messaging

    import snakemq.message

     

    from common import nxlogger

    import snakemqlogger as logger

     

    from common.callbacks import Callback

     

    class SnakemqListener(threading.Thread):

             def __init__(self, snakemqident = None, ip = "localhost", port = 9090, persistent = False):

                      super(SnakemqListener,self).__init__()

                      self.messaging = None

                      self.link = None

                      self.pktr = None

                      self.snakemqident = snakemqident

                      self.ip = ip;

                      self.port = port

                      self.connectors = {}

                      self.on_recv = Callback()

                      self.persistent = persistent

                      self._initlistener()

     

             '''

             thread run

             '''

             def run(self):

                      logger.info("listener start...")

                     

                      if self.link != None:

                               self.link.loop()

     

                      logger.info("listener end...")

     

             '''

             terminate snakemq listener thread

             '''

             def terminate(self):

                      logger.info("listener terminating...")

                      if self.link != None:

                               self.link.stop()

                               self.link.cleanup()

                      logger.info("listener terminated")

     

             '''

             receive message from host named ident

             '''

             def on_recv_message(self, conn, ident, message):

                      try:

                               self.on_recv(ident, message.data.decode('utf-8'))#dispatch received data

                               self.sendMsg('bob','hello,{0}'.format(ident).encode('utf-8'))

                      except Exception as e:

                               logger.error("listener recv:{0}".format(e))

                               print(e)

     

             def on_drop_message(self, ident, message):

                      print("message dropped", ident, message)

                      logger.debug("listener:message dropped,ident:{0},message:{1}".format(ident, message))

     

             '''client connect'''

             def on_connect(self, ident):

                      logger.debug("listener:{0} connected".format(ident))

                      self.connectors[ident] = ident

                      self.sendMsg(ident, "hello".encode('utf-8'))

     

             '''client disconnect'''

             def on_disconnect(self, ident):

                      logger.debug("listener:{0} disconnected".format(ident))

                      if ident in self.connectors:

                               self.connectors.pop(ident)

     

             '''

             listen start loop

             '''

             def _initlistener(self):

                      try:

                               self.link = snakemq.link.Link()

                               self.link.add_listener((self.ip, self.port))

     

                               self.pktr = snakemq.packeter.Packeter(self.link)

                               self.pktr.on_connect.add(self.on_connect)

                               self.pktr.on_disconnect.add(self.on_disconnect)

     

                               if self.persistent:

                                        storage = SqliteQueuesStorage("SnakemqStorage.db")

                                        self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr, storage)

                               else:

                                        self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr)

                              

                               self.messaging.on_message_recv.add(self.on_recv_message)

                               self.messaging.on_message_drop.add(self.on_drop_message)

     

                      except Exception as e:

                               logger.error("listener:{0}".format(e))

                      finally:

                               logger.info("listener:loop ended...")

     

             '''send message to dest host named destIdent'''

             def sendMsg(self, destIdent, byteseq):

                      msg = None

                      if self.persistent:

                               msg = snakemq.message.Message(byteseq, ttl=60, flags=FLAG_PERSISTENT)

                      else:

                               msg = snakemq.message.Message(byteseq, ttl=60)

                      if self.messaging == None:

                               logger.error("listener:messaging is not initialized, send message failed")

                               return

                      self.messaging.send_message(destIdent, msg)

     

    测试代码connector(testSnakeConnector.py):

    读取本地一个1M的文件,然后发送给listener,然后listener发回一个hello的信息。

    from netComm.snakemq import snakemqConnector

    import time

    import sys

    import os

     

    def received(ident, data):

             print(data)

     

    if __name__ == "__main__":

             bob = snakemqConnector.SnakemqConnector('bob',"10.16.5.45",4002,True)

             bob.on_recv.add(received)

             bob.start()

             try:

                      with open("testfile.txt",encoding='utf-8') as f:

                               txt = f.read()

                               for i in range(100):

                                        bob.sendMsg("niess",txt.encode('utf-8'))

                                        time.sleep(0.1)

             except Exception as e:

                      print(e)

             time.sleep(5)

             bob.terminate()     

     

    测试代码listener(testSnakeListener.py):

    from netComm.snakemq import snakemqListener

    import time

     

    def received(ident, data):

             filename =  "log/recFile{0}.txt".format(time.strftime('%S',time.localtime()))

             file = open(filename,'w')

             file.writelines(data)

             file.close()

     

    if __name__ == "__main__":

             niess = snakemqListener.SnakemqListener("niess","10.16.5.45",4002)

             niess.on_recv.add(received)

             niess.start()

             print("niess start...")

             time.sleep(60)

             niess.terminate()   

             print("niess end...")

     

    四、测试结果

     应该与其它消息类组件对比后出结论。应付一个小项目绝对杠杠滴。

  • 相关阅读:
    Redis常用数据类型及应用场景之Set
    Redis常用数据类型及应用场景之List
    Redis常用数据类型及应用场景之Hash
    exists & not exists
    oracle 中 dblink 的简单使用
    DockerCompose之数据卷Volume
    DockerCompose之常见编排脚本
    160308-学习State Pattern Actor
    12.3-框架维护
    12.2-机器人协作系统
  • 原文地址:https://www.cnblogs.com/shelwinnee/p/5418009.html
Copyright © 2011-2022 走看看