zoukankan      html  css  js  c++  java
  • Java Netty (2)

    通过一个实例来说明Netty的使用。用1个服务器连接5个客户端线程,客户端连接上服务器以后就向服务器发送消息,服务器接收到消息后向客户端返回消息,客户端接收到消息以后,等待随机的时间,再向服务端发送消息,这样一直循环下去。

    项目结构:

    NettyServer.java:

    package Server;
    
    import java.net.InetSocketAddress;
    import java.util.concurrent.Executors;
    
    import org.jboss.netty.bootstrap.ServerBootstrap;  
    import org.jboss.netty.channel.*;  
    import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;  
    import org.jboss.netty.handler.execution.ExecutionHandler;
    import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
    
    import Util.Constant;
    
    
    public class NettyServer {
        
        public static String host = "127.0.0.1";
        
        // 创建1个线程池
        static ExecutionHandler executionHandler = new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576));
        
        public static void main(String[] args) {
            // ChannelFactory
            final ChannelFactory channelFactory = new NioServerSocketChannelFactory(  
                    // Boss线程池,处理Socket请求
                    Executors.newCachedThreadPool(),  
                    // Worker线程池,由于使用的是NIO,1个Worker线程可以管理多个Channel
                    Executors.newCachedThreadPool()); 
            // ServerBootstrap
            ServerBootstrap bootstrap = new ServerBootstrap(channelFactory);
    
            ServerPipelineFactory serverPipelineFactory = new ServerPipelineFactory(executionHandler);
            bootstrap.setPipelineFactory(serverPipelineFactory); 
            
            // 禁用nagle算法
            bootstrap.setOption("child.tcpNoDelay", true);  
            // 启用TCP保活检测
            bootstrap.setOption("child.keepAlive", true); 
            
            // 监听5个端口
            bootstrap.bind(new InetSocketAddress(Constant.p1));
            System.out.println("Listening port " + Constant.p1 + "...");
            bootstrap.bind(new InetSocketAddress(Constant.p2));
            System.out.println("Listening port " + Constant.p2 + "...");
            bootstrap.bind(new InetSocketAddress(Constant.p3));
            System.out.println("Listening port " + Constant.p3 + "...");
            bootstrap.bind(new InetSocketAddress(Constant.p4));
            System.out.println("Listening port " + Constant.p4 + "...");
            bootstrap.bind(new InetSocketAddress(Constant.p5));
            System.out.println("Listening port " + Constant.p5 + "...");
        }
    
    }

    ServerPipelineFactory.java:

    package Server;
    
    import org.jboss.netty.channel.ChannelPipeline;
    import org.jboss.netty.channel.ChannelPipelineFactory;
    import org.jboss.netty.channel.Channels;
    import org.jboss.netty.handler.codec.string.StringDecoder;
    import org.jboss.netty.handler.codec.string.StringEncoder;
    import org.jboss.netty.handler.execution.ExecutionHandler;
    
    public class ServerPipelineFactory implements ChannelPipelineFactory {
        
        private final ExecutionHandler executionHandler; 
        
        public ServerPipelineFactory(ExecutionHandler executionHandler){
            this.executionHandler = executionHandler;
        }
    
        @Override
        public ChannelPipeline getPipeline() throws Exception {
            // TODO Auto-generated method stub
            return Channels.pipeline( 
                    new StringEncoder(),    
                    new StringDecoder(), 
                    // 多个pipeline之间必须共享同一个ExecutionHandler,放在业务逻辑handler之前
                    executionHandler,
                    // 业务逻辑handler
                    new MyServerHandler());
        } 
    
    }

    MyServerHandler.java:

    package Server;
    
    import org.jboss.netty.channel.Channel;
    import org.jboss.netty.channel.ChannelHandlerContext;
    import org.jboss.netty.channel.ChannelStateEvent;
    import org.jboss.netty.channel.ExceptionEvent;
    import org.jboss.netty.channel.MessageEvent;
    import org.jboss.netty.channel.SimpleChannelHandler;
    import Util.Tool;
    
    public class MyServerHandler extends SimpleChannelHandler{
        
        @SuppressWarnings("static-access")
        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            System.out.println("Server received:" + e.getMessage());
            // 休息随机秒后发送消息
            Thread th = Thread.currentThread();
            int interval = Tool.getInterval(100);
            th.sleep(interval*1000);
            e.getChannel().write("from Server: Hello!");
            super.messageReceived(ctx, e);
        }
        
        @Override  
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {  
            e.getCause().printStackTrace();  
            Channel ch = e.getChannel();  
            ch.close(); 
            super.exceptionCaught(ctx, e);
        } 
        
        @Override  
        public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            System.out.println("A client connected!");
            super.channelConnected(ctx, e); 
        }
    
    }

    NettyClient.java:

    package Client;
    
    import java.net.InetSocketAddress;
    import java.util.concurrent.Executors;
    
    import org.jboss.netty.bootstrap.ClientBootstrap;  
    import org.jboss.netty.channel.*;  
    import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;  
    import org.jboss.netty.handler.execution.ExecutionHandler;
    import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
    
    import Util.Constant;
    
    public class NettyClient extends Thread{
        
        public static String host = "127.0.0.1";
        ClientBootstrap bootstrap;
        int port;
        
        // 创建1个线程池
        static ExecutionHandler executionHandler = new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576));
        
        public NettyClient(int port) {
            this.port = port;
            // ChannelFactory
            final ChannelFactory channelFactory = new NioClientSocketChannelFactory(  
                    // Boss线程池
                    Executors.newCachedThreadPool(),  
                    // Worker线程池
                    Executors.newCachedThreadPool());  
            // ServerBootstrap
            bootstrap = new ClientBootstrap(channelFactory);
            
            ClientPipelineFactory clientPipelineFactory = new ClientPipelineFactory(executionHandler);
            bootstrap.setPipelineFactory(clientPipelineFactory);
            bootstrap.setOption("tcpNoDelay" ,true);  
            bootstrap.setOption("keepAlive", true);  
            bootstrap.connect(new InetSocketAddress(port));
     
        }
        
        public void run(){
            ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
            // 开始试图连接
            System.out.println("Connecting to port " + port + "...");
            // 等待直到连接关闭或失败
            future.getChannel().getCloseFuture().awaitUninterruptibly(); 
            // 关闭线程池准备退出
            bootstrap.releaseExternalResources();
        }
        
        public static void main(String[] args) {
            NettyClient nc1 = new NettyClient(Constant.p1);
            NettyClient nc2 = new NettyClient(Constant.p2);
            NettyClient nc3 = new NettyClient(Constant.p3);
            NettyClient nc4 = new NettyClient(Constant.p4);
            NettyClient nc5 = new NettyClient(Constant.p5);
            
            nc1.start();
            nc2.start();
            nc3.start();
            nc4.start();
            nc5.start();
        }
    
    }

    ClientPipelineFactory.java:

    package Client;
    
    import org.jboss.netty.channel.ChannelPipeline;
    import org.jboss.netty.channel.ChannelPipelineFactory;
    import org.jboss.netty.channel.Channels;
    import org.jboss.netty.handler.codec.string.StringDecoder;
    import org.jboss.netty.handler.codec.string.StringEncoder;
    import org.jboss.netty.handler.execution.ExecutionHandler;
    
    public class ClientPipelineFactory implements ChannelPipelineFactory {
        
        private final ExecutionHandler executionHandler; 
        
        public ClientPipelineFactory(ExecutionHandler executionHandler){
            this.executionHandler = executionHandler;
        }
    
        @Override
        public ChannelPipeline getPipeline() throws Exception {
            // TODO Auto-generated method stub
            return Channels.pipeline( 
                    new StringEncoder(),    
                    new StringDecoder(), 
                    // 多个pipeline之间必须共享同一个ExecutionHandler,放在业务逻辑handler之前
                    executionHandler,
                    // 业务逻辑handler
                    new MyClientHandler());
        } 
    }

    MyClientHandler.java:

    package Client;
    
    import org.jboss.netty.channel.Channel;
    import org.jboss.netty.channel.ChannelHandlerContext;
    import org.jboss.netty.channel.ChannelStateEvent;
    import org.jboss.netty.channel.ExceptionEvent;
    import org.jboss.netty.channel.MessageEvent;
    import org.jboss.netty.channel.SimpleChannelHandler;
    import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
    
    import Util.Tool;
    
    public class MyClientHandler extends SimpleChannelHandler{
        
          
        // 连接到服务端时,发出消息
        @Override
        public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 
            System.out.println("Connected to Server!");
            e.getChannel().write("from Client: Hello! " + System.currentTimeMillis()); 
            super.channelConnected(ctx, e);
        }  
      
        @SuppressWarnings("static-access")
        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            System.out.println("Client Received:" + e.getMessage());
            // 休息随机秒后发送消息
            Thread th = Thread.currentThread();
            int interval = Tool.getInterval(5);
            th.sleep(interval*1000);
            e.getChannel().write("from Client: Hello! "  + System.currentTimeMillis());
            super.messageReceived(ctx, e);
        }  
          
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {  
            e.getCause().printStackTrace();  
            Channel ch = e.getChannel();  
            ch.close(); 
            super.exceptionCaught(ctx, e);
        } 
    
    }

    Constant.java:

    package Util;
    
    public class Constant {
        final static int start = 10000;
        public static int p1 = start + 1;
        public static int p2 = start + 2;
        public static int p3 = start + 3;
        public static int p4 = start + 4;
        public static int p5 = start + 5;
    }

    Tool.java:

    package Util;
    
    import java.util.Random;
    
    public class Tool {
        
        static Random rand = new Random();
        
        public static int getInterval(int max){
            return rand.nextInt(max);
        }
    }
  • 相关阅读:
    mouseenter和mouseleave,mouseover和mouseout
    哈哈哈
    instanceof与typeof
    js事件传参
    浮动与清除问题
    简易富文本编辑
    js之prototype
    json序列化
    我对Defer and Promise的实现
    Ajax 完整教程
  • 原文地址:https://www.cnblogs.com/mstk/p/6791675.html
Copyright © 2011-2022 走看看