python examples https://github.com/imatix/zguide/tree/master/examples/Python
hwserver.py
- #
- # Hello World server in Python
- # Binds REP socket to tcp://*:5555
- # Expects "Hello" from client, replies with "World"
- #
- import zmq
- import time
- context = zmq.Context()
- socket = context.socket(zmq.REP)
- socket.bind("tcp://*:5555")
- while True:
- # Wait for next request from client
- message = socket.recv()
- print "Received request: ", message
- # Do some 'work'
- time.sleep (1) # Do some 'work'
- # Send reply back to client
- socket.send("World")
hwclient.py
- #
- # Hello World client in Python
- # Connects REQ socket to tcp://localhost:5555
- # Sends "Hello" to server, expects "World" back
- #
- import zmq
- context = zmq.Context()
- # Socket to talk to server
- print "Connecting to hello world server..."
- socket = context.socket(zmq.REQ)
- socket.connect ("tcp://localhost:5555")
- # Do 10 requests, waiting each time for a response
- for request in range (1,10):
- print "Sending request ", request,"..."
- socket.send ("Hello")
- # Get the reply.
- message = socket.recv()
- print "Received reply ", request, "[", message, "]"
问题3:zeroMQ实现一个消息层?
答:
实现一个ZeroMQ消息层需要三个步骤:
1.选择传输协议
0MQ提供了4种不同的传输协议
INPROC an In-Process communication model
IPC an Inter-Process communication model
MULTICAST multicast via PGM, possibly encapsulated in UDP
TCP a network based transport
2.建立基础
由于在网络中两个端点是相对动态的,很难有一个稳定的单一连接点。
如果是这种情况,可以使用由0MQ提供的转发设备。
转发设备可以绑定2个不同端口,并且转发消息从一个端点到另一个端点。
这样做的话,在网络中转发设备能够变成一个稳定的点,其它组件都可以去连接。
0MQ提供了3种类型的设备
QUEUE, a forwarder for the request/response messaging pattern
FORWARDER, a forwarder for the publish/subscribe messaging pattern
STREAMER, a forwarder for the pipelined messaging pattern
3.选择通讯模式
0MQ支持4种模式
REQUEST/REPLY, bidirectional, load balanced and state based
PUBLISH/SUBSCRIBE, publish to multiple recipients at once
UPSTREAM / DOWNSTREAM, distribute data to nodes arranged in a pipeline
PAIR, communication exclusively between peers
Req/Rep
均衡负载请求:
server 1
- import zmq
- context = zmq.Context()
- socket = context.socket(zmq.REP)
- socket.bind("tcp://127.0.0.1:5000")
- while True:
- msg = socket.recv()
- print "Got", msg
- socket.send(msg)
server 2
- import zmq
- context = zmq.Context()
- socket = context.socket(zmq.REP)
- socket.bind("tcp://127.0.0.1:6000")
- while True:
- msg = socket.recv()
- print "Got", msg
- socket.send(msg)
client
- import zmq
- context = zmq.Context()
- socket = context.socket(zmq.REQ)
- socket.connect("tcp://127.0.0.1:5000")
- socket.connect("tcp://127.0.0.1:6000")
- for i in range(10):
- msg = "msg %s" % i
- socket.send(msg)
- print "Sending", msg
- msg_in = socket.recv()
会发现client的请求会被均衡的分配给两个server
Example client output:
Sending msg 0
Sending msg 1
Sending msg 2
Sending msg 3
Sending msg 4
Sending msg 5
Sending msg 6
Sending msg 7
Sending msg 8
Sending msg 9
Example output server 1 at port 5000:
Got msg 0
Got msg 2
Got msg 4
Got msg 6
Got msg 8
Example output server 2 at port 6000:
Got msg 1
Got msg 3
Got msg 5
Got msg 7
Got msg 9
现在,如果我们要加入一个额外的server去管理我们的请求,我们将不得不修改我们的代码。
这是非常麻烦的,我们需要让每个client都知道有一个额外的server可以均衡请求。
为了解决这个问题,替代client直接去连接多个server的方式,client去连接转发设备,再由转发设备路由全部的消息给server。
Pub/Sub
在pub/sub模式下组件是松耦合的。类似于广播电台。
一个广播server为现场足球赛
- import zmq
- from random import choice
- context = zmq.Context()
- socket = context.socket(zmq.PUB)
- socket.bind("tcp://127.0.0.1:5000")
- countries = ['netherlands','brazil','germany','portugal']
- events = ['yellow card', 'red card', 'goal', 'corner', 'foul']
- while True:
- msg = choice( countries ) +" "+ choice( events )
- print "->",msg
- socket.send( msg )<span style="white-space: normal;"> </span>
输出
-> portugal corner
-> portugal yellow card
-> portugal goal
-> netherlands yellow card
-> germany yellow card
-> brazil yellow card
-> portugal goal
-> germany corner
…
一个客户端去收听特定的消息
- import zmq
- context = zmq.Context()
- socket = context.socket(zmq.SUB)
- socket.connect("tcp://127.0.0.1:5000")
- socket.setsockopt(zmq.SUBSCRIBE, "netherlands")
- socket.setsockopt(zmq.SUBSCRIBE, "germany")
- while True:
- print socket.recv()
输出
netherlands red card
netherlands goal
netherlands red card
germany foul
netherlands yellow card
germany foul
netherlands goal
netherlands corner
germany foul
netherlands corner
…
Pipelining
并发处理数据,其工作模式
一个工作者得到来自上游socket的消息,一旦处理完成后发送消息到下游。
Paired socket
服务器监听某个端口,客户端连接到这个端口,消息可以双向流动。
server
- import zmq
- context = zmq.Context()
- socket = context.socket(zmq.PAIR)
- socket.bind("tcp://127.0.0.1:5555")
client
- import zmq
- context = zmq.Context()
- socket = context.socket(zmq.PAIR)
- socket.connect("tcp://127.0.0.1:5555")
ps:
推荐