前言
使用过简单的python的ZMQ:server开启3个线程,client发送心跳包,如果服务端超过n秒没应答,则重新连接。
网上找的案例,server使用的zmq.device,但是一直不明白什么含义。
案例链接:http://nphard.me/2016/03/05/pyzmq-demo/
client使用了超时重连,传输数据时,如果server没有响应则重新连接并重新发送数据,这样会导致,客户端重复发送多条数据,被服务端接收处理,但是服务端并没有回复。(以下准备解释,但是没成功)
以下为参考网站,很多英文的看不懂。
http://pyzmq.readthedocs.io/en/latest/
http://zguide.zeromq.org/py:all
https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/index.html
https://wizardforcel.gitbooks.io/zmq-guide/content/chapter1.html
具体细节参考的中文资料在对应目录内给出。
zmq的router和dealer是什么
网上有很多资料介绍zmq的几种模式,以下介绍req/rep应答模式。
首先确认一个概念:XREQ/XREP are aliases for ROUTER/DEALER. XREQ/XREP were used in ZeroMQ 2.x.
那么router和dealer是什么?直接上链接:
http://www.cnblogs.com/fengbohello/p/4743868.html
官网介绍了应答模式的几种区别:http://zguide.zeromq.org/py:chapter3
以下是自己的理解,如果有问题请留言,谢谢。
参考画图:https://stackoverflow.com/questions/23581172/what-is-zmq-router-and-zmq-dealer-in-python-zeromq
zmq的device
参考官方api:http://pyzmq.readthedocs.io/en/latest/api/zmq.devices.html
参考device详细介绍:http://pjwqq.iteye.com/blog/2260254
根据上图给出代码,运行device_func,frontend面向客户端(发起请求方),绑定router接收请求,device内部通过backend与服务端连接,dealer向服务端发起请求,并将数据传达给server。
所以流程就是这样:client (req) -----> router/dealer (queue) -----> server (rep)
device 和 server 端:通过多线程写在一起的。运行先后顺序是 1、device --- 2、server / client (随便哪一个先启动都会等待后启动连接,如果中途server异常,client需要超时重试, !!!!!^~^)
def device_func(): thread_num = 3 context = zmq.Context() url_router = 'inproc://ping-server' url_dealer = 'tcp://*:5559' # Socket do cliente frontend = context.socket(zmq.ROUTER) # 或者是 zmq.ROUTER zmq.XREP frontend.bind(url_dealer) # Socket do servidor backend = context.socket(zmq.DEALER) # 或者是 zmq.DEALER zmq.XREQ backend.bind(url_router) for i in range(1, thread_num + 1): thread = threading.Thread(target=server_func, args=(i, url_router, context)) thread.start() # property找不到 Device()报错 zmq.device(zmq.QUEUE, frontend, backend) frontend.close() backend.close() context.term() def server_func(name, url_router, context): print(">>> start %d %s" % (name, '.' * 50)) socket = context.socket(zmq.REP) socket.connect(url_router) while True: try: message = socket.recv() r_data = msgpack.unpackb(message, encoding='utf-8') print('server %d received:' % name, r_data) data = 'server %d send: %s' % (name, utils.time_now()) print(data) s_data = msgpack.packb(data) socket.send(s_data) except: traceback.print_exc() socket.close() break
client端,连接device的frontend端(dealer):
def client2(): url_dealer = 'tcp://localhost:5559' context = zmq.Context() socket = context.socket(zmq.REQ) print("Collecting data from server…") socket.connect(url_dealer) pid = os.getpid() i = 0 while 1: i += 1 # data = input(' >>:').strip() data = ('send - msg%d' % i) # print(data) s_data = msgpack.packb(data.encode()) socket.send(s_data) ret = socket.recv() r_data = msgpack.unpackb(ret, encoding='utf-8') print('>>>received msg%d server info: %s ' % (i, r_data))
以上一起理解了zmq的device、router、dealer。
官网指出:function zmq.
device
(device_type, frontend, backend) Deprecated since version libzmq-3.2: Use zmq.proxy 取而代之的 zmq.
proxy
(frontend, backend, capture)
同时又有 class zmq.devices.
Device
(device_type=3, in_type=None, out_type=None) 测试了跑不起来
备注:
inproc 是zmq本地 进程/线程 的传输方式。可以自定义。
zmq的Polling
主要功能应该是 一个程序种需要建立多个socket连接时,需要接收数据,又要转发数据等。参考以下链接:
http://pjwqq.iteye.com/blog/2260791
我这里主要是使用到client的超时重试功能。重现客户端不断超时重试发送数据,服务端不断接收回复。数据大量冗余重复。
server:同上,不过加了一句 sleep:
time.sleep(6) message = socket.recv()
client:
def conn_socket(url_dealer): """ 建立socket连接""" try: context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect(url_dealer) poller = zmq.Poller() poller.register(socket, zmq.POLLIN) print('[%d] socket conn success' % os.getpid()) return socket, poller except: traceback.print_exc() def close_socket(socket, poller): """ 关闭socket连接""" try: socket.close() poller.unregister(socket) except: traceback.print_exc() def client1(): """ """ url_dealer = 'tcp://localhost:5559' socket, poller = conn_socket(url_dealer) try: i = 0 while 1: i += 1 # data = input(' >>:').strip() data = 'msg%d' % i print('send:%s' % data) # print(data) s_data = msgpack.packb(data.encode()) socket.send(s_data) while True: # 超时后重新连接,参数是毫秒 if poller.poll(5 * 1000): ret = socket.recv() r_data = msgpack.unpackb(ret, encoding='utf-8') print('>>>received:%s' % r_data) break else: close_socket(socket, poller) socket, poller = conn_socket(url_dealer) print('*resend:%s' % data) socket.send(s_data) except Exception as e: traceback.print_exc() finally: close_socket(socket, poller) pass
注意看:我故意将server的sleep值 > client 的超时重试的值,那么问题就重现了:
客户端不停的连接重试重发数据:
服务端不停的接收回复:
看了官方文档的后续的更多的高级功能介绍,发现并没有找到解决方案,暂时智能把超时时间设置的足够大一些,并在返回数据中添加一个状态,重试1次加一个1,方便反查和监控报警。