zoukankan      html  css  js  c++  java
  • 架构师养成记--19.netty

    一、Netty初步

    为什么选择Netty?

    和NIO比较,要实现一个通信要简单得很多,性能很好。分布式消息中间件、storm、Dubble都是使用Netty作为底层通信。

    Netty5.0要求jdk1.6以上。

    http://netty.io

    二、编码步骤

    创建两个Nio线程组,一个事件处理,一个网络读写通信

    创建一个ServerBootStrap,配置Netty参数;

    创建实际处理的ChannelInitializer,进行初始化的准备工作,比如设置接收传出的字符集,格式,已经实际处理数据接口

    绑定端口执行同步阻塞方法等待服务器端启动。

    并发编程网 http://ifeve.com/netty5-user-guide

    三、代码

    服务端

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    /**
     * Created by sgm on 2017/1/28.
     */
    public class Server {
        public static void main(String []args) throws InterruptedException {
            /**
             * EventLoopGroup 事件组,不同的事件组处理不同的协议,这里使用NioEventLoopGroup来处理Tcp协议
             * 第一个事件循环组用来接收客户端信息,通常被称作boss,boos接收到信息后注册到worker上
             * 第二个事件循环组做实际的业务处理
             */
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            /**
             * 引导,对server进行一系列的配置
             */
            ServerBootstrap b = new ServerBootstrap();
            //将boss和worker关联起来
            b.group(bossGroup,workerGroup)
             .channel(NioServerSocketChannel.class)//使用的通道类型
             .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new ServerHandler());
                }
             })//添加serverHandler,绑定具体的事件处理器
                    /**
                     * 当客户端向客户端发送请求会带一个信号,服务器端返回一个信号并且将客户端的链接信息加入队列A中,
                     * 服务器再收到一个客户端的信号并且将客户端的信息从队列A移动到队列B中;
                     * 队列A和队列B的长度之和就是BACKLOG,如果大于了BACKLOG,tcp内核就会拒绝链接,BACKLOG只会影响到没有被accept取出的链接
                     */
             .option(ChannelOption.SO_BACKLOG,128)
             .option(ChannelOption.SO_KEEPALIVE,true);//保持连接
            ChannelFuture cf = b.bind(9876).sync();//绑定端口进行监听,异步的监听
    
            cf.channel().closeFuture().sync();//阻塞
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
    
        }
    }
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.ReferenceCountUtil;
    
    /**
     * Created by Administrator on 2017/1/28.
     */
    public class ServerHandler extends ChannelHandlerAdapter{
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                //do something msg
                ByteBuf buf = (ByteBuf)msg;
                byte[] data = new byte[buf.readableBytes()];
                buf.readBytes(data);
                String request = new String(data, "utf-8");
                System.out.println("Server: " + request);
                //写给客户端
                String response = "我是反馈的信息";
                ctx.writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
            }finally {
                ReferenceCountUtil.release(msg);//有上一句话,即ctx.writeAndFlush(),这里可以不关闭
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }

    客户端

    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    public class Client {
    
        public static void main(String[] args) throws Exception {
    
            EventLoopGroup workgroup = new NioEventLoopGroup();
            Bootstrap b = new Bootstrap();
            b.group(workgroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            sc.pipeline().addLast(new ClientHandler());
                        }
                    });
    
            ChannelFuture cf1 = b.connect("127.0.0.1", 9876).sync();
    
            //buf
            cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes()));
    
            cf1.channel().closeFuture().sync();
            workgroup.shutdownGracefully();
    
        }
    }
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.ReferenceCountUtil;
    
    public class ClientHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                //do something msg
                ByteBuf buf = (ByteBuf)msg;
                byte[] data = new byte[buf.readableBytes()];
                buf.readBytes(data);
                String request = new String(data, "utf-8");
                System.out.println("Client: " + request);
    
    
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
  • 相关阅读:
    请说出这些测试最好由那些人员完成,测试的是什么?
    测试结束的标准是什么?
    你的测试职业发展目标是什么?
    elementui医疗
    医疗前端
    spring创建对象3种方式
    idea-git
    eclipse-git
    ArrayList01
    单体权限
  • 原文地址:https://www.cnblogs.com/sigm/p/6322570.html
Copyright © 2011-2022 走看看