zoukankan      html  css  js  c++  java
  • Mina、Netty、Twisted一起学(七):发布/订阅(Publish/Subscribe)

    消息传递有很多种方式,请求/响应(Request/Reply)是最常用的。在前面的博文的例子中,很多都是采用请求/响应的方式,当服务器接收到消息后,会立即write回写一条消息到客户端。HTTP协议也是基于请求/响应的方式。

    但是请求/响应并不能满足所有的消息传递的需求,有些需求可能需要服务端主动推送消息到客户端,而不是被动的等待请求后再给出响应。

    发布/订阅(Publish/Subscribe)是一种服务器主动发送消息到客户端的消息传递方式。订阅者Subscriber连接到服务器客户端后,相当于开始订阅发布者Publisher发布的消息,当发布者发布了一条消息后,所有订阅者都会接收到这条消息。

    网络聊天室一般就是基于发布/订阅模式来实现。例如加入一个QQ群,就相当于订阅了这个群的所有消息,当有新的消息,服务器会主动将消息发送给所有的客户端。只不过聊天室里的所有人既是发布者又是订阅者。

    下面分别用MINA、Netty、Twisted分别实现简单的发布/订阅模式的服务器程序,连接到服务器的所有客户端都是订阅者,当发布者发布一条消息后,服务器会将消息转发给所有客户端。

    MINA:

    在MINA中,通过IoService的getManagedSessions()方法可以获取这个IoService当前管理的所有IoSession,即所有连接到服务器的客户端集合。当服务器接收到发布者发布的消息后,可以通过IoService的getManagedSessions()方法获取到所有客户端对应的IoSession并将消息发送到这些客户端。

    public class TcpServer {
    
        public static void main(String[] args) throws IOException {
            IoAcceptor acceptor = new NioSocketAcceptor();
    
            acceptor.getFilterChain().addLast("codec",
                    new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), "
    ", "
    ")));
    
            acceptor.setHandler(new TcpServerHandle());
            acceptor.bind(new InetSocketAddress(8080));
        }
    
    }
    
    class TcpServerHandle extends IoHandlerAdapter {
    
        @Override
        public void exceptionCaught(IoSession session, Throwable cause)
                throws Exception {
            cause.printStackTrace();
        }
    
        @Override
        public void messageReceived(IoSession session, Object message)
                throws Exception {
            
            // 获取所有正在连接的IoSession
            Collection<IoSession> sessions = session.getService().getManagedSessions().values();
    
            // 将消息写到所有IoSession
            IoUtil.broadcast(message, sessions);
        }
    }

    Netty:

    Netty提供了ChannelGroup来用于保存Channel组,ChannelGroup是一个线程安全的Channel集合,它提供了一些列Channel批量操作。当一个TCP连接关闭后,对应的Channel会自动从ChannelGroup移除,所以不用手动去移除关闭的Channel。

    Netty文档关于ChannelGroup的解释:

    A thread-safe Set that contains open Channels and provides various bulk operations on them. Using ChannelGroup, you can categorize Channels into a meaningful group (e.g. on a per-service or per-state basis.) A closed Channel is automatically removed from the collection, so that you don't need to worry about the life cycle of the added Channel. A Channel can belong to more than one ChannelGroup.

    当有新的客户端连接到服务器,将对应的Channel加入到一个ChannelGroup中,当发布者发布消息时,服务器可以将消息通过ChannelGroup写入到所有客户端。

    public class TcpServer {
    
        public static void main(String[] args) throws InterruptedException {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new LineBasedFrameDecoder(80));
                                pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
                                pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
                                pipeline.addLast(new TcpServerHandler());
                            }
                        });
                ChannelFuture f = b.bind(8080).sync();
                f.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
    }
    
    class TcpServerHandler extends ChannelInboundHandlerAdapter {
    
        // ChannelGroup用于保存所有连接的客户端,注意要用static来保证只有一个ChannelGroup实例,否则每new一个TcpServerHandler都会创建一个ChannelGroup
        private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            channels.add(ctx.channel()); // 将新的连接加入到ChannelGroup,当连接断开ChannelGroup会自动移除对应的Channel
        }
        
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            channels.writeAndFlush(msg + "
    "); // 接收到消息后,将消息发送到ChannelGroup中的所有客户端
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            // cause.printStackTrace();  暂时把异常打印注释掉,因为PublishClient发布一条消息后会立即断开连接,而服务器也会向PublishClient发送消息,所以会抛出异常
            ctx.close();
        }
    }

    Twisted:

    在Twisted中,全局的数据一般会放在Factory,而每个连接相关的数据会放在Protocol中。所以这里可以在Factory中加入一个属性,来存放Protocol集合,表示所有连接服务器的客户端。当有新的客户端连接到服务器时,将对应的Protocol实例放入集合,当连接断开,将对应的Protocol从集合中移除。当服务器接收到发布者发布的消息后,遍历所有客户端并发送消息。

    # -*- coding:utf-8 –*-
    
    from twisted.protocols.basic import LineOnlyReceiver
    from twisted.internet.protocol import Factory
    from twisted.internet import reactor
    
    class TcpServerHandle(LineOnlyReceiver): 
    
        def __init__(self, factory):
            self.factory = factory
    
        def connectionMade(self):
            self.factory.clients.add(self) # 新连接添加连接对应的Protocol实例到clients
    
        def connectionLost(self, reason):
            self.factory.clients.remove(self) # 连接断开移除连接对应的Protocol实例
    
        def lineReceived(self, line):
            # 遍历所有的连接,发送数据
            for c in self.factory.clients:
                c.sendLine(line)
    
    class TcpServerFactory(Factory):
        def __init__(self):
            self.clients = set() # set集合用于保存所有连接到服务器的客户端
    
        def buildProtocol(self, addr):
            return TcpServerHandle(self)
    
    reactor.listenTCP(8080, TcpServerFactory())
    reactor.run()

    下面分别是两个客户端程序,一个是用于发布消息的客户端,一个是订阅消息的客户端。

    发布消息的客户端很简单,就是向服务器write一条消息即可:

    public class PublishClient {
    
        public static void main(String[] args) throws IOException {
    
            Socket socket = null;
            OutputStream out = null;
    
            try {
    
                socket = new Socket("localhost", 8080);
                out = socket.getOutputStream();
                out.write("Hello
    ".getBytes()); // 发布信息到服务器
                out.flush();
    
            } finally {
                // 关闭连接
                out.close();
                socket.close();
            }
        }
    }

    订阅消息的客户端连接到服务器后,会阻塞等待接收服务器发送的发布消息:

    public class SubscribeClient {
    
        public static void main(String[] args) throws IOException {
    
            Socket socket = null;
            BufferedReader in = null;
    
            try {
    
                socket = new Socket("localhost", 8080);
                in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    
                while (true) {
                    String line = in.readLine(); // 阻塞等待服务器发布的消息
                    System.out.println(line);
                }
    
            } finally {
                // 关闭连接
                in.close();
                socket.close();
            }
        }
    }

    分别针对MINA、Netty、Twisted服务器进行测试:

    1、测试时首先开启服务器;
    2、然后再运行订阅消息的客户端SubscribeClient,SubscribeClient可以开启多个;
    3、最后运行发布消息的客户端PublishClient,可以多次运行查看所有SubscribeClient的输出结果。

    运行结果可以发现,当运行发布消息的客户端PublishClient发布一条消息到服务器时,服务器会主动将这条消息转发给所有的TCP连接,所有的订阅消息的客户端SubscribeClient都会接收到这条消息并打印出来。

    MINA、Netty、Twisted一起学系列

    MINA、Netty、Twisted一起学(一):实现简单的TCP服务器

    MINA、Netty、Twisted一起学(二):TCP消息边界问题及按行分割消息

    MINA、Netty、Twisted一起学(三):TCP消息固定大小的前缀(Header)

    MINA、Netty、Twisted一起学(四):定制自己的协议

    MINA、Netty、Twisted一起学(五):整合protobuf

    MINA、Netty、Twisted一起学(六):session

    MINA、Netty、Twisted一起学(七):发布/订阅(Publish/Subscribe)

    MINA、Netty、Twisted一起学(八):HTTP服务器

    MINA、Netty、Twisted一起学(九):异步IO和回调函数

    MINA、Netty、Twisted一起学(十):线程模型

    MINA、Netty、Twisted一起学(十一):SSL/TLS

    MINA、Netty、Twisted一起学(十二):HTTPS

    源码

    https://github.com/wucao/mina-netty-twisted

  • 相关阅读:
    June. 26th 2018, Week 26th. Tuesday
    June. 25th 2018, Week 26th. Monday
    June. 24th 2018, Week 26th. Sunday
    June. 23rd 2018, Week 25th. Saturday
    June. 22 2018, Week 25th. Friday
    June. 21 2018, Week 25th. Thursday
    June. 20 2018, Week 25th. Wednesday
    【2018.10.11 C与C++基础】C Preprocessor的功能及缺陷(草稿)
    June.19 2018, Week 25th Tuesday
    June 18. 2018, Week 25th. Monday
  • 原文地址:https://www.cnblogs.com/wucao/p/3985407.html
Copyright © 2011-2022 走看看