zoukankan      html  css  js  c++  java
  • pyzmq学习笔记

    前言

    使用过简单的python的ZMQ:server开启3个线程,client发送心跳包,如果服务端超过n秒没应答,则重新连接。

    网上找的案例,server使用的zmq.device,但是一直不明白什么含义。

    案例链接:http://nphard.me/2016/03/05/pyzmq-demo/

    client使用了超时重连,传输数据时,如果server没有响应则重新连接并重新发送数据,这样会导致,客户端重复发送多条数据,被服务端接收处理,但是服务端并没有回复。(以下准备解释,但是没成功)

    以下为参考网站,很多英文的看不懂。

    http://api.zeromq.org/

    这里是翻译的目录

    http://pyzmq.readthedocs.io/en/latest/

    http://zguide.zeromq.org/py:all

    这里搜到的中文教材

    https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/index.html

    https://wizardforcel.gitbooks.io/zmq-guide/content/chapter1.html

    具体细节参考的中文资料在对应目录内给出。

    zmq的router和dealer是什么

    网上有很多资料介绍zmq的几种模式,以下介绍req/rep应答模式。

    首先确认一个概念:XREQ/XREP are aliases for ROUTER/DEALER. XREQ/XREP were used in ZeroMQ 2.x.

    那么router和dealer是什么?直接上链接:

    http://www.cnblogs.com/fengbohello/p/4743868.html

    官网介绍了应答模式的几种区别:http://zguide.zeromq.org/py:chapter3

    以下是自己的理解,如果有问题请留言,谢谢。

     参考画图:https://stackoverflow.com/questions/23581172/what-is-zmq-router-and-zmq-dealer-in-python-zeromq

     zmq的device

     参考官方api:http://pyzmq.readthedocs.io/en/latest/api/zmq.devices.html

     参考device详细介绍:http://pjwqq.iteye.com/blog/2260254

    根据上图给出代码,运行device_func,frontend面向客户端(发起请求方),绑定router接收请求,device内部通过backend与服务端连接,dealer向服务端发起请求,并将数据传达给server。

    所以流程就是这样:client (req)  ----->  router/dealer (queue)  ----->  server (rep)

    device 和 server 端:通过多线程写在一起的。运行先后顺序是  1、device  ---  2、server / client (随便哪一个先启动都会等待后启动连接,如果中途server异常,client需要超时重试, !!!!!^~^)

    def device_func():
        thread_num = 3
        context = zmq.Context()
        url_router = 'inproc://ping-server'
        url_dealer = 'tcp://*:5559'
    
        # Socket do cliente
        frontend = context.socket(zmq.ROUTER)  # 或者是 zmq.ROUTER zmq.XREP
        frontend.bind(url_dealer)
    
        # Socket do servidor
        backend = context.socket(zmq.DEALER)  # 或者是 zmq.DEALER zmq.XREQ
        backend.bind(url_router)
    
        for i in range(1, thread_num + 1):
            thread = threading.Thread(target=server_func, args=(i, url_router, context))
            thread.start()
    
        # property找不到 Device()报错
        zmq.device(zmq.QUEUE, frontend, backend)
    
        frontend.close()
        backend.close()
        context.term()
    
    
    def server_func(name, url_router, context):
        print(">>> start %d %s" % (name, '.' * 50))
        socket = context.socket(zmq.REP)
        socket.connect(url_router)
        while True:
            try:
                message = socket.recv()
                r_data = msgpack.unpackb(message, encoding='utf-8')
                print('server %d received:' % name, r_data)
    
                data = 'server %d send: %s' % (name, utils.time_now())
                print(data)
                s_data = msgpack.packb(data)
                socket.send(s_data)
    
            except:
                traceback.print_exc()
                socket.close()
                break

    client端,连接device的frontend端(dealer):

    def client2():
        url_dealer = 'tcp://localhost:5559'
        context = zmq.Context()
        socket = context.socket(zmq.REQ)
    
        print("Collecting data from server…")
        socket.connect(url_dealer)
        pid = os.getpid()
        i = 0
        while 1:
            i += 1
            # data = input('
    >>:').strip()
            data = ('send - msg%d' % i)
            # print(data)
            s_data = msgpack.packb(data.encode())
            socket.send(s_data)
    
            ret = socket.recv()
            r_data = msgpack.unpackb(ret, encoding='utf-8')
            print('>>>received msg%d server info: %s 
    ' % (i, r_data))

    以上一起理解了zmq的device、router、dealer。

    官网指出:function  zmq.device(device_typefrontendbackendDeprecated since version libzmq-3.2: Use zmq.proxy 取而代之的 zmq.proxy(frontendbackendcapture)

    同时又有 class zmq.devices.Device(device_type=3in_type=Noneout_type=None) 测试了跑不起来

    备注:

    inproc 是zmq本地 进程/线程 的传输方式。可以自定义。

     zmq的Polling

     主要功能应该是 一个程序种需要建立多个socket连接时,需要接收数据,又要转发数据等。参考以下链接:

    官方文档

    http://pjwqq.iteye.com/blog/2260791

    官方API翻译的

    我这里主要是使用到client的超时重试功能。重现客户端不断超时重试发送数据,服务端不断接收回复。数据大量冗余重复。

    server:同上,不过加了一句 sleep:

    time.sleep(6)
    message = socket.recv()

    client:

    def conn_socket(url_dealer):
        """ 建立socket连接"""
        try:
    
            context = zmq.Context()
            socket = context.socket(zmq.REQ)
            socket.connect(url_dealer)
    
            poller = zmq.Poller()
            poller.register(socket, zmq.POLLIN)
    
            print('[%d] socket conn success' % os.getpid())
            return socket, poller
    
        except:
            traceback.print_exc()
    
    
    def close_socket(socket, poller):
        """ 关闭socket连接"""
        try:
            socket.close()
            poller.unregister(socket)
        except:
            traceback.print_exc()
    
    
    def client1():
        """ """
        url_dealer = 'tcp://localhost:5559'
        socket, poller = conn_socket(url_dealer)
        try:
    
            i = 0
            while 1:
                i += 1
                # data = input('
    >>:').strip()
                data = 'msg%d' % i
                print('send:%s' % data)
                # print(data)
                s_data = msgpack.packb(data.encode())
                socket.send(s_data)
    
                while True:
                    # 超时后重新连接,参数是毫秒
                    if poller.poll(5 * 1000):
                        ret = socket.recv()
                        r_data = msgpack.unpackb(ret, encoding='utf-8')
                        print('>>>received:%s' % r_data)
                        break
                    else:
                        close_socket(socket, poller)
                        socket, poller = conn_socket(url_dealer)
    
                        print('*resend:%s' % data)
                        socket.send(s_data)
    
        except Exception as e:
            traceback.print_exc()
        finally:
            close_socket(socket, poller)
            pass

    注意看:我故意将server的sleep值 > client 的超时重试的值,那么问题就重现了:

    客户端不停的连接重试重发数据:

    服务端不停的接收回复:

     看了官方文档的后续的更多的高级功能介绍,发现并没有找到解决方案,暂时智能把超时时间设置的足够大一些,并在返回数据中添加一个状态,重试1次加一个1,方便反查和监控报警。

  • 相关阅读:
    教你修改Linux下高并发socket最大连接数所受的各种限制
    nginx浏览pdf
    Spring3 M2 quartz-2.1.7 解决bean不能注入问题
    DCSync
    Linux下python2.7安装pip
    ._cache_问题
    php-mvc概念
    php第十天-面向对象命名空间
    php第九天-session/cookice会话控制
    fastadmin V1.0.0.20200506_beta 漏洞复现
  • 原文地址:https://www.cnblogs.com/haoxr/p/9083439.html
Copyright © 2011-2022 走看看