zoukankan      html  css  js  c++  java
  • netty4----netty5的客户端和服务端

    服务端:

    package com.server;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    /**
     * netty5版本服务端
     */
    public class Server {
    
        public static void main(String[] args) {
            //服务类
            ServerBootstrap bootstrap = new ServerBootstrap();
            
            //boss和worker, netty5不是线程池,而是事件循环组,里面包含线程池。
            EventLoopGroup boss = new NioEventLoopGroup();
            EventLoopGroup worker = new NioEventLoopGroup();
            
            try {
                //设置线程池
                bootstrap.group(boss, worker);//boss用来监听端口的
                
                //设置socket工厂、
                bootstrap.channel(NioServerSocketChannel.class);
                
                //设置管道工厂
                bootstrap.childHandler(new ChannelInitializer<Channel>() {
    
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ch.pipeline().addLast(new StringDecoder());
                        ch.pipeline().addLast(new StringEncoder());
                        ch.pipeline().addLast(new ServerHandler());
                    }
                });
                
                //netty3中对应设置如下
                //bootstrap.setOption("backlog", 1024);
                //bootstrap.setOption("tcpNoDelay", true);
                //bootstrap.setOption("keepAlive", true);
                //设置参数,TCP参数
                bootstrap.option(ChannelOption.SO_BACKLOG, 2048);//serverSocketchannel的设置,链接缓冲池的大小。tcp的服务端是有队列的。队列保存2048个客户端。2048后面的连接是拒绝的。
                bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);//socketchannel的设置,维持链接的活跃,清除死链接
                bootstrap.childOption(ChannelOption.TCP_NODELAY, true);//socketchannel的设置,关闭延迟发送。发一包并不是马上发出去,而是积累到一定之后再发出去。
                
                //绑定端口
                ChannelFuture future = bootstrap.bind(10101);
                
                System.out.println("start");
                
                //等待服务端关闭
                future.channel().closeFuture().sync();
                
            } catch (Exception e) {
                e.printStackTrace();
            } finally{
                //释放资源
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
    }
    package com.server;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    /**
     * 服务端消息处理
     */
    public class ServerHandler extends SimpleChannelInboundHandler<String> {
    
        @Override
        protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
            System.out.println(msg);
            ctx.channel().writeAndFlush("hi");
            ctx.writeAndFlush("hi");
        }
    
        /**
         * 新客户端接入
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelActive");
        }
    
        /**
         * 客户端断开
         */
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelInactive");
        }
    
        /**
         * 异常
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
        }
        
        
    }

    客户端:

    package com.client;
    
    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    /**
     * netty5版本的客户端
     */
    public class Client {
    
        public static void main(String[] args) {
            //服务类
            Bootstrap bootstrap = new Bootstrap();
            //worker
            EventLoopGroup worker = new NioEventLoopGroup();//boss用来监听端口,这里只创建worker
            try {
                //设置线程池
                bootstrap.group(worker);
                //设置socket工厂、
                bootstrap.channel(NioSocketChannel.class);
                //设置管道
                bootstrap.handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ch.pipeline().addLast(new StringDecoder());
                        ch.pipeline().addLast(new StringEncoder());
                        ch.pipeline().addLast(new ClientHandler());
                    }
                });
                ChannelFuture connect = bootstrap.connect("127.0.0.1", 10101);
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
                while(true){
                    System.out.println("请输入:");
                    String msg = bufferedReader.readLine();
                    connect.channel().writeAndFlush(msg);
                }
            } catch (Exception e) {
                 e.printStackTrace();
            } finally{
                worker.shutdownGracefully();
            }
        }
    }
    package com.client;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    /**
     * 客户端消息处理
     */
    public class ClientHandler extends SimpleChannelInboundHandler<String> {
        @Override
        protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
            System.out.println("客户端收到消息:"+msg);
        }
    
    }

    一个客户端启动多个连接:

    package com.client;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.atomic.AtomicInteger;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    /**
     多连接客户端,客户端保持一个连接不够,要保持多个连接。
      
      线程池是有多个线程,每个线程里面有一个任务队列,线程run的时候会从任务队列取一个任务出来,执行任务的run方法,
      队列里面没有任务就阻塞等待新的任务进来。
      
      一个thread + 队列 == 一个单线程线程池   =====> 线程安全的,任务是线性串行执行的
      
      
      
     对象池:首先初始化n个对象,把这些对象放入一个队列里面,需要对象的时候会出栈一个对象,有对象就出栈,使用完了归还对象池里面。
    没有对象会阻塞等待有可用的对象。或者创建一个新的对象使用完之后归还线程池,归还的时候如果池子满了就销毁。
    比如数据库连接池:使用完后要释放资源,就是把连接放回连接池里面。
    
    
     对象组:首先初始化n个对象,把这些对象放入一个数组里面。使用的时候获取一个对象不移除,使用完之后不用归还。需要对象有并发的能力。
     
     
     
    对象组:线程安全,不会产生阻塞效应
    对象池:线程不安全,会产生阻塞效应
     */
    public class MultClient {
        
        /**
         * 服务类
         */
        private Bootstrap bootstrap = new Bootstrap();
        
        /**
         * 会话,多个channel,
         */
        private List<Channel> channels = new ArrayList<>();
        
        /**
         * 引用计数
         */
        private final AtomicInteger index = new AtomicInteger();
        
        /**
         * 初始化
         * @param count
         */
        public void init(int count){
            
            //worker
            EventLoopGroup worker = new NioEventLoopGroup();
            
            //设置线程池
            bootstrap.group(worker);
            
            //设置socket工厂、
            bootstrap.channel(NioSocketChannel.class);
            
            //设置管道
            bootstrap.handler(new ChannelInitializer<Channel>() {
    
                @Override
                protected void initChannel(Channel ch) throws Exception {
                    ch.pipeline().addLast(new StringDecoder());
                    ch.pipeline().addLast(new StringEncoder());
                    ch.pipeline().addLast(new ClientHandler());
                }
            });
            
            for(int i=1; i<=count; i++){
                ChannelFuture future = bootstrap.connect("192.168.0.103", 10101);
                channels.add(future.channel());
            }
        }
        
        /**
         * 获取会话
         */
        public Channel nextChannel(){
            return getFirstActiveChannel(0);
        }
        
        
        private Channel getFirstActiveChannel(int count){
            Channel channel = channels.get(Math.abs(index.getAndIncrement() % channels.size()));
            if(!channel.isActive()){
                //重连
                reconnect(channel);
                if(count >= channels.size()){
                    throw new RuntimeException("no can use channel");
                }
                return getFirstActiveChannel(count + 1);
            }
            return channel;
        }
        
        /**
         * 重连
         * @param channel
         */
        private void reconnect(Channel channel){
            synchronized(channel){
                if(channels.indexOf(channel) == -1){//已经重连过了
                    return ;
                }
                Channel newChannel = bootstrap.connect("192.168.0.103", 10101).channel();
                channels.set(channels.indexOf(channel), newChannel);
            }
        }
    }
    package com.client;
    
    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    /**
     * 启动类
     */
    public class Start {
    
        public static void main(String[] args) {
    
            MultClient client = new MultClient();
            client.init(5);//初始化5个连接
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
            while(true){
                try {
                    System.out.println("请输入:");
                    String msg = bufferedReader.readLine();
                    client.nextChannel().writeAndFlush(msg);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
    }
  • 相关阅读:
    学习笔记16:残差网络
    学习笔记15:第二种加载数据的方法
    学习笔记14:模型保存
    学习笔记13:微调模型
    学习笔记12:图像数据增强及学习速率衰减
    蚯蚓
    [JSOI] 重要的城市
    正则表达式
    加分二叉树
    选择客栈
  • 原文地址:https://www.cnblogs.com/yaowen/p/9053386.html
Copyright © 2011-2022 走看看