本篇简单介绍ZMQ
的使用。广泛应用于本公司的Android 主板
板间通讯、局域网通讯,学习一哈。
ZMQ简介
ZMQ被称为史上最快的消息队列,它处于会话层之上,应用层之下,使用后台异步线程完成消息的接受和发送,完美的封装了Socket API,大大简化了编程人员的复杂度。
- ZMQ发送和接受的是具有固定长度的二进制对象,ZMQ的消息包最大254个字节,前6个字节是协议,然后是数据包。
如果超过255个字节(有一个字节表示包属性),则ZMQ会自动分包传输;而对于TCP Socket,是面向字节流的连接。
-
传统的TCP Socket的连接是1对1的,ZMQ的Socket可以很轻松的实现1对N,N对1和N对N的连接模式。
-
ZMQ使用异步后台线程处理接受和发送请求,这意味着发送完消息,不可以立即释放资源,消息什么时候发送用户是无法控制的,同时,ZMQ自动重连,
这意味着用户可以以任意顺序加入到网络中,服务器也可以随时加入或者退出网络;
ZMQ几种模式
- 请求-回复模式
这种模式主要用于从客户端向一个或多个服务器发送请求,客户端首先使用zmq_send 发送消息,再用zmq_recv来接收消息。
服务端先用zmq_recv接收消息如果收到了客户端的消息,则使用zmq_send向客户端发送消息。如此循环。形成请求-回复模式。
- 发布-订阅模式(PUB-SUB)
这种模式主要用于一个服务器对应一个或多个客户端,该模式相对来说是异步的。客户端在一个循环体中执行zmq_recv来接收消息。
如果尝试向SUB socket发送消息会导致错误。
TIPS: 你无法知道订阅者从什么时候开始获取消息。即使是启动订阅者,过一段时间启动发布者。订阅者总是会错过发布者的第一条信息。
因为订阅者连接到发布者需要一点时间(虽然可能很小)。
3)管道模式
这种模式会将数据发布(PUSH)到由管道排列的节点上面,数据总是沿着管道流动。每个管道至少连接了一个节点(Worker),节点不会主动从管道中获取(PULL)数据,数据会负载均衡的分配在节点上。。
如果平均的批次时间为5S。
-
1个节点,总时长 5034 ms
-
2个节点,总时长 2421 ms
-
4个节点,总时长 1018 ms
请求-回复实例 (JAVA)
Client:
import org.zeromq.ZMQ;
public class hwclient {
public static void main(String[] args) {
ZMQ.Context context = ZMQ.context(1);
// Socket to talk to server
System.out.println("Connecting to hello world server…");
ZMQ.Socket requester = context.socket(ZMQ.REQ);
requester.connect("tcp://localhost:5555");
for (int requestNbr = 0; requestNbr != 10; requestNbr++) {
String request = "Hello";
System.out.println("Sending Hello " + requestNbr);
requester.send(request.getBytes(), 0);
byte[] reply = requester.recv(0);
System.out.println("Received " + new String(reply) + " " + requestNbr);
}
requester.close();
context.term();
}
}
Server:
import org.zeromq.ZMQ;
public class hwserver {
public static void main(String[] args) throws Exception {
ZMQ.Context context = ZMQ.context(1);
// Socket to talk to clients
ZMQ.Socket responder = context.socket(ZMQ.REP);
responder.bind("tcp://*:5555");
while (!Thread.currentThread().isInterrupted()) {
// Wait for next request from the client
byte[] request = responder.recv(0);
System.out.println("Received Hello");
// Do some 'work'
Thread.sleep(1000);
// Send reply back to client
String reply = "World";
responder.send(reply.getBytes(), 0);
}
responder.close();
context.term();
}
}
Client:
Server:
发布-订阅模式实例 (JAVA)
Client:
import org.zeromq.ZMQ;
public class hwclient {
public static void client() {
new Thread(new Runnable() {
@Override
public void run() {
ZMQ.Context zContext = ZMQ.context(1);
ZMQ.Socket socket = zContext.socket(ZMQ.SUB);
socket.connect("tcp://" + "127.0.0.1" + ":5553");
socket.subscribe("Hello World!".getBytes());
while(true){
System.out.println("client:"+new String(socket.recv(0)));
}
//socket.close();
}
}).start();
}
public static void main(String args[]){
client();
}
}
Server:
import org.zeromq.ZMQ;
public class hwserver {
private static void server() {
new Thread(new Runnable() {
@Override
public void run() {
ZMQ.Context zContext = ZMQ.context(1);
ZMQ.Socket publisher = zContext.socket(ZMQ.PUB);
publisher.bind("tcp://*:5553");
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.currentThread().sleep(1000);
String reply = "Hello World!";
System.out.println("server:"+reply);
publisher.send(reply.getBytes(), 0);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/*publisher.close ();
zContext.term ();*/
}
}).start();
}
public static void main(String args[]){
server();
}
}
Client:
more in ZMQ
more in other blog 云风
2017/11/22 update
org.zeromq.ZMQException: Operation cannot be accomplished in current state
ZMQ不可以线程之间共享Socket
2018/01/18 update
UnsatisfiedLinkError
网上下载的zmq.jar中,缺少了arm64-v8a
的libjzmq.so
文件,在使用arm64-v8a
架构手机的时候会出现此问题。
解决方案:
1.可以使用网上的源码替代之。
缺少jnacl.jar自行下载。
2.也可以自己编译 arm64-v8a
libjzmq.so
文件。