zoukankan      html  css  js  c++  java
  • Mina、Netty、Twisted一起学(七):公布/订阅(Publish/Subscribe)

    消息传递有非常多种方式。请求/响应(Request/Reply)是最经常使用的。在前面的博文的样例中。非常多都是採用请求/响应的方式。当server接收到消息后,会马上write回写一条消息到client。

    HTTP协议也是基于请求/响应的方式。

    可是请求/响应并不能满足全部的消息传递的需求,有些需求可能须要服务端主动推送消息到client,而不是被动的等待请求后再给出响应。

    公布/订阅(Publish/Subscribe)是一种server主动发送消息到client的消息传递方式。订阅者Subscriber连接到serverclient后,相当于開始订阅公布者Publisher公布的消息,当公布者公布了一条消息后,全部订阅者都会接收到这条消息。

    网络聊天室一般就是基于公布/订阅模式来实现。

    比如增加一个QQ群。就相当于订阅了这个群的全部消息。当有新的消息,server会主动将消息发送给全部的client。仅仅只是聊天室里的全部人既是公布者又是订阅者。

    以下分别用MINA、Netty、Twisted分别实现简单的公布/订阅模式的server程序,连接到server的全部client都是订阅者,当公布者公布一条消息后,server会将消息转发给全部client。

    MINA:

    在MINA中。通过IoService的getManagedSessions()方法能够获取这个IoService当前管理的全部IoSession,即全部连接到server的client集合。当server接收到公布者公布的消息后,能够通过IoService的getManagedSessions()方法获取到全部client相应的IoSession并将消息发送到这些client。

    public class TcpServer {
    
    	public static void main(String[] args) throws IOException {
    		IoAcceptor acceptor = new NioSocketAcceptor();
    
    		acceptor.getFilterChain().addLast("codec",
    				new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), "
    ", "
    ")));
    
    		acceptor.setHandler(new TcpServerHandle());
    		acceptor.bind(new InetSocketAddress(8080));
    	}
    
    }
    
    class TcpServerHandle extends IoHandlerAdapter {
    
    	@Override
    	public void exceptionCaught(IoSession session, Throwable cause)
    			throws Exception {
    		cause.printStackTrace();
    	}
    
    	@Override
    	public void messageReceived(IoSession session, Object message)
    			throws Exception {
    		
    		// 获取全部正在连接的IoSession
    		Collection<IoSession> sessions = session.getService().getManagedSessions().values();
    
    		// 将消息写到全部IoSession
    		IoUtil.broadcast(message, sessions);
    	}
    }

    Netty:

    Netty提供了ChannelGroup来用于保存Channel组,ChannelGroup是一个线程安全的Channel集合,它提供了一些列Channel批量操作。

    当一个TCP连接关闭后,相应的Channel会自己主动从ChannelGroup移除。所以不用手动去移除关闭的Channel。

    Netty文档关于ChannelGroup的解释:

    A thread-safe Set that contains open Channels and provides various bulk operations on them. Using ChannelGroup, you can categorize Channels into a meaningful group (e.g. on a per-service or per-state basis.) A closed Channel is automatically removed from the collection, so that you don't need to worry about the life cycle of the added Channel. A Channel can belong to more than one ChannelGroup.

    当有新的client连接到server,将相应的Channel增加到一个ChannelGroup中。当公布者公布消息时,server能够将消息通过ChannelGroup写入到全部client。

    public class TcpServer {
    
    	public static void main(String[] args) throws InterruptedException {
    		EventLoopGroup bossGroup = new NioEventLoopGroup();
    		EventLoopGroup workerGroup = new NioEventLoopGroup();
    		try {
    			ServerBootstrap b = new ServerBootstrap();
    			b.group(bossGroup, workerGroup)
    					.channel(NioServerSocketChannel.class)
    					.childHandler(new ChannelInitializer<SocketChannel>() {
    						@Override
    						public void initChannel(SocketChannel ch) throws Exception {
    							ChannelPipeline pipeline = ch.pipeline();
    							pipeline.addLast(new LineBasedFrameDecoder(80));
    							pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
    							pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
    							pipeline.addLast(new TcpServerHandler());
    						}
    					});
    			ChannelFuture f = b.bind(8080).sync();
    			f.channel().closeFuture().sync();
    		} finally {
    			workerGroup.shutdownGracefully();
    			bossGroup.shutdownGracefully();
    		}
    	}
    }
    
    class TcpServerHandler extends ChannelInboundHandlerAdapter {
    
    	// ChannelGroup用于保存全部连接的client。注意要用static来保证仅仅有一个ChannelGroup实例。否则每new一个TcpServerHandler都会创建一个ChannelGroup
    	private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
    	@Override
    	public void channelActive(ChannelHandlerContext ctx) {
    		channels.add(ctx.channel()); // 将新的连接增加到ChannelGroup。当连接断开ChannelGroup会自己主动移除相应的Channel
    	}
    	
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) {
    		channels.writeAndFlush(msg + "
    "); // 接收到消息后。将消息发送到ChannelGroup中的全部client
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    		// cause.printStackTrace();  临时把异常打印凝视掉,由于PublishClient公布一条消息后会马上断开连接。而server也会向PublishClient发送消息,所以会抛出异常
    		ctx.close();
    	}
    }

    Twisted:

    在Twisted中,全局的数据通常会放在Factory,而每一个连接相关的数据会放在Protocol中。所以这里能够在Factory中增加一个属性,来存放Protocol集合,表示全部连接server的client。当有新的client连接到server时。将相应的Protocol实例放入集合。当连接断开,将相应的Protocol从集合中移除。当server接收到公布者公布的消息后,遍历全部client并发送消息。

    # -*- coding:utf-8 –*-
    
    from twisted.protocols.basic import LineOnlyReceiver
    from twisted.internet.protocol import Factory
    from twisted.internet import reactor
    
    class TcpServerHandle(LineOnlyReceiver): 
    
        def __init__(self, factory):
            self.factory = factory
    
        def connectionMade(self):
            self.factory.clients.add(self) # 新连接增加连接相应的Protocol实例到clients
    
        def connectionLost(self, reason):
            self.factory.clients.remove(self) # 连接断开移除连接相应的Protocol实例
    
        def lineReceived(self, line):
            # 遍历全部的连接。发送数据
            for c in self.factory.clients:
                c.sendLine(line)
    
    class TcpServerFactory(Factory):
        def __init__(self):
            self.clients = set() # set集合用于保存全部连接到server的client
    
        def buildProtocol(self, addr):
            return TcpServerHandle(self)
    
    reactor.listenTCP(8080, TcpServerFactory())
    reactor.run()

    以下各自是两个client程序,一个是用于公布消息的client。一个是订阅消息的client。

    公布消息的client非常easy,就是向serverwrite一条消息就可以:

    public class PublishClient {
    
    	public static void main(String[] args) throws IOException {
    
    		Socket socket = null;
    		OutputStream out = null;
    
    		try {
    
    			socket = new Socket("localhost", 8080);
    			out = socket.getOutputStream();
    			out.write("Hello
    ".getBytes()); // 公布信息到server
    			out.flush();
    
    		} finally {
    			// 关闭连接
    			out.close();
    			socket.close();
    		}
    	}
    }

    订阅消息的client连接到server后,会堵塞等待接收server发送的公布消息:

    public class SubscribeClient {
    
    	public static void main(String[] args) throws IOException {
    
    		Socket socket = null;
    		BufferedReader in = null;
    
    		try {
    
    			socket = new Socket("localhost", 8080);
    			in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    
    			while (true) {
    				String line = in.readLine(); // 堵塞等待server公布的消息
    				System.out.println(line);
    			}
    
    		} finally {
    			// 关闭连接
    			in.close();
    			socket.close();
    		}
    	}
    }

    分别针对MINA、Netty、Twistedserver进行測试:

    1、測试时首先开启server;
    2、然后再执行订阅消息的clientSubscribeClient,SubscribeClient能够开启多个;
    3、最后执行公布消息的clientPublishClient。能够多次执行查看全部SubscribeClient的输出结果。

    执行结果能够发现,当执行公布消息的clientPublishClient公布一条消息到server时。server会主动将这条消息转发给全部的TCP连接,全部的订阅消息的clientSubscribeClient都会接收到这条消息并打印出来。


    作者:叉叉哥   转载请注明出处:http://blog.csdn.net/xiao__gui/article/details/39396789


    MINA、Netty、Twisted一起学系列

    MINA、Netty、Twisted一起学(一):实现简单的TCPserver

    MINA、Netty、Twisted一起学(二):TCP消息边界问题及按行切割消息

    MINA、Netty、Twisted一起学(三):TCP消息固定大小的前缀(Header)

    MINA、Netty、Twisted一起学(四):定制自己的协议

    MINA、Netty、Twisted一起学(五):整合protobuf

    MINA、Netty、Twisted一起学(六):session

    MINA、Netty、Twisted一起学(七):公布/订阅(Publish/Subscribe)

    MINA、Netty、Twisted一起学(八):HTTPserver

    MINA、Netty、Twisted一起学(九):异步IO和回调函数

    MINA、Netty、Twisted一起学(十):线程模型

    MINA、Netty、Twisted一起学(十一):SSL/TLS

    MINA、Netty、Twisted一起学(十二):HTTPS

    源代码

    https://github.com/wucao/mina-netty-twisted


  • 相关阅读:
    NGINX -- 详解Nginx几种常见实现301重定向方法上的区别
    数据库外键的使用以及优缺点
    phpok -- 域名问题
    Sql Server系列:多表连接查询
    Go -- cron定时任务的用法
    JavaScript -- 清除缓存
    sql CAST用法
    Mock -- 数据模拟
    EsLint入门
    citus real-time 分析demo( 来自官方文档)
  • 原文地址:https://www.cnblogs.com/yxysuanfa/p/6857316.html
Copyright © 2011-2022 走看看