zoukankan      html  css  js  c++  java
  • netty学习

    1、参考地址

    https://waylau.gitbooks.io/netty-4-user-guide/content/Architectural-Overview/Universal-Asynchronous-IO-API.html

    https://github.com/waylau/netty-4-user-guide-demos

    2、服务端

    以上为关键步骤,服务端处理器,渠道设置,通道设置,也就是第4步。

    以下为建一个服务端为例

    2.1、初始化ServerBootstrap实例,此实例是netty应用开发的入口

    2.2、创建Reactor主线程池,用于接收客户端请求的线程池职责

    2.2.1、接收客户端TCP连接,初始化 Channel参数

    2.2.2、将链路状态变更事件通知给 Channelpipeline

    2.3、创建Reactor从线程池

    2.3.1、异步读取通信对端的数据报,发送读事件到 Channelpipeline;

    2.3.2、异步发送消息到通信对端,调用 Channelpipeline的消息发送接口;

    2.3.3、执行系统调用Task;

    2.3.4、执行定时任务Task,例如链路空闲状态监测定时任务.

    2.4、设置netty主从线程池

    2.5、设置Channel

    2.6、childGroup的处理器,用来管理通道

    2.7、设置选项 SO_BACKLOG 设置线程队列的链接个数

    2.8、SO_KEEPALIVE 设置保持链接活动连接状态

    2.9、绑定端口

    2.10、关闭线程池

    3、客户端

    对于服务端核心在于设置childHandler,对于客户端核心在于设置handler。

    服务端示例

    public class SimpleChatServer{
    
    	public static void main(String[] args) {
    		ServerBootstrap boot = new ServerBootstrap();
    		EventLoopGroup parentGroup = new NioEventLoopGroup();
    		EventLoopGroup childGroup = new NioEventLoopGroup();
    		
    		try {
    			boot.group(parentGroup, childGroup);
    			boot.channel(NioServerSocketChannel.class);
    			boot.childHandler(new SimpleChatServerInitializer());
    			boot.option(ChannelOption.SO_BACKLOG, 128);
    			boot.childOption(ChannelOption.SO_KEEPALIVE, true);
    			boot.bind(NettyConstant.PORT).sync().channel().closeFuture().sync();
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		} finally {
    			parentGroup.shutdownGracefully();
    			childGroup.shutdownGracefully();
    		}
    		
    		
    	}
    	
    }
    

      

    public class SimpleChatServerInitializer extends ChannelInitializer<SocketChannel> {
    
    	@Override
    	protected void initChannel(SocketChannel ch) throws Exception {
    		ChannelPipeline pipeline = ch.pipeline();
    		pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
            pipeline.addLast("decoder", new StringDecoder());
            pipeline.addLast("encoder", new StringEncoder());
            pipeline.addLast("handler", new SimpleChatServerHandler());
    	}
    
    }
    

      

    public class SimpleChatServerHandler extends SimpleChannelInboundHandler<String> { // (1)
    	
    	/**
    	 * A thread-safe Set  Using ChannelGroup, you can categorize Channels into a meaningful group.
    	 * A closed Channel is automatically removed from the collection,
    	 */
    	public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {  // (2)
            Channel incoming = ctx.channel();
            
            // Broadcast a message to multiple Channels
            channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入
    ");
            
            channels.add(ctx.channel());
        }
    
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {  // (3)
            Channel incoming = ctx.channel();
            
            // Broadcast a message to multiple Channels
            channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 离开
    ");
            
            // A closed Channel is automatically removed from ChannelGroup,
            // so there is no need to do "channels.remove(ctx.channel());"
        }
        @Override
    	protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { // (4)
    		Channel incoming = ctx.channel();
    		for (Channel channel : channels) {
                if (channel != incoming){
                    channel.writeAndFlush("[" + incoming.remoteAddress() + "]" + s + "
    ");
                } else {
                	channel.writeAndFlush("[you]" + s + "
    ");
                }
            }
    	}
      
    	@Override
    	public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5)
            Channel incoming = ctx.channel();
    		System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"在线");
    	}
    	
    	@Override
    	public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6)
            Channel incoming = ctx.channel();
    		System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"掉线");
    	}
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
        	Channel incoming = ctx.channel();
    		System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"异常");
            // 当出现异常就关闭连接
            cause.printStackTrace();
            ctx.close();
        }
    }
    

      客户端示例

    public class SimpleChatClient {
    
    	public static void main(String[] args) {
    		Bootstrap boot = new Bootstrap();
    		EventLoopGroup parentGroup = new NioEventLoopGroup();
    		
    		try {
    			boot.group(parentGroup);
    			boot.channel(NioSocketChannel.class);
    			boot.handler(new SimpleChatClientInitializer());
    			boot.option(ChannelOption.SO_BACKLOG, 128);
    			
    			Channel channel = boot.connect(NettyConstant.HOST, NettyConstant.PORT).sync().channel();
    			
    			BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
                while(true){
                    channel.writeAndFlush(in.readLine() + "
    ");
                }
                
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		} catch (IOException e) {
    			e.printStackTrace();
    		} finally {
    			parentGroup.shutdownGracefully();
    		}
    		
    		
    	}
    	
    }
    

      

    public class SimpleChatClientInitializer extends ChannelInitializer<SocketChannel> {
    
    	@Override
        public void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            
            pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
            pipeline.addLast("decoder", new StringDecoder());
            pipeline.addLast("encoder", new StringEncoder());
            pipeline.addLast("handler", new SimpleChatClientHandler());
        }
    	
    }
    

      

    public class SimpleChatClientHandler extends  SimpleChannelInboundHandler<String> {
    
    	@Override
    	protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
    		System.out.println(s);
    	}
    	
    }
    

      

  • 相关阅读:
    java socket解析和发送二进制报文工具(附java和C++转化问题)
    hibernate缓存机制(二级缓存)
    Spring MVC中前后台数据传输小结
    NUC972 MDK NON-OS
    代码是如何控制硬件的?
    C语言位运算+实例讲解(转)
    C语言程序真正的启动函数
    51单片机的时钟及总线时序和总线扩展
    如何通过Keil将程序正确的下载进flash中
    说说M451例程讲解之串口
  • 原文地址:https://www.cnblogs.com/yun965861480/p/14185169.html
Copyright © 2011-2022 走看看