zoukankan      html  css  js  c++  java
  • Netty的简单Demo

    这个demo是通过网上下载:

    使用maven构建的:

    项目结构:

    pom.xml:

        <dependencies>
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.0.33.Final</version>
            </dependency>
        </dependencies>

    客户端:

      Client.java

    package com.xys.client;
    
    import com.xys.common.CustomHeartbeatHandler;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import io.netty.handler.timeout.IdleStateHandler;
    
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author xiongyongshun
     * @version 1.0
     * @email yongshun1228@gmail.com
     * @created 16/9/18 12:59
     */
    public class Client {
        private NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
        private Channel channel;
        private Bootstrap bootstrap;
    
        public static void main(String[] args) throws Exception {
            Client client = new Client();
            client.start();
            client.sendData();
        }
    
        public void sendData() throws Exception {
            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < 10000; i++) {
                if (channel != null && channel.isActive()) {
                    String content = "client msg " + i;
                    ByteBuf buf = channel.alloc().buffer(5 + content.getBytes().length);
                    buf.writeInt(5 + content.getBytes().length);
                    buf.writeByte(CustomHeartbeatHandler.CUSTOM_MSG);
                    buf.writeBytes(content.getBytes());
                    channel.writeAndFlush(buf);
                }
    
                Thread.sleep(random.nextInt(20000));
            }
        }
    
        public void start() {
            try {
                bootstrap = new Bootstrap();
                bootstrap
                        .group(workGroup)
                        .channel(NioSocketChannel.class)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline p = socketChannel.pipeline();
                                p.addLast(new IdleStateHandler(0, 0, 5));
                                p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
                                p.addLast(new ClientHandler(Client.this));
                            }
                        });
                doConnect();
    
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    
        protected void doConnect() {
            if (channel != null && channel.isActive()) {
                return;
            }
    
            ChannelFuture future = bootstrap.connect("127.0.0.1", 12345);
    
            future.addListener(new ChannelFutureListener() {
                public void operationComplete(ChannelFuture futureListener) throws Exception {
                    if (futureListener.isSuccess()) {
                        channel = futureListener.channel();
                        System.out.println("Connect to server successfully!");
                    } else {
                        System.out.println("Failed to connect to server, try connect after 10s");
    
                        futureListener.channel().eventLoop().schedule(new Runnable() {
                            @Override
                            public void run() {
                                doConnect();
                            }
                        }, 10, TimeUnit.SECONDS);
                    }
                }
            });
        }
    
    }

    ClientHandler.java

    package com.xys.client;
    
    import com.xys.common.CustomHeartbeatHandler;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    
    /**
     * @author xiongyongshun
     * @version 1.0
     * @email yongshun1228@gmail.com
     * @created 16/9/18 13:08
     */
    public class ClientHandler extends CustomHeartbeatHandler {
        private Client client;
        public ClientHandler(Client client) {
            super("client");
            this.client = client;
        }
    
        @Override
        protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
            byte[] data = new byte[byteBuf.readableBytes() - 5];
            byteBuf.skipBytes(5);
            byteBuf.readBytes(data);
            String content = new String(data);
            System.out.println(name + " get content: " + content);
        }
    
        @Override
        protected void handleAllIdle(ChannelHandlerContext ctx) {
            super.handleAllIdle(ctx);
            sendPingMsg(ctx);
        }
    
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            super.channelInactive(ctx);
            client.doConnect();
        }
    }

    CustomHeartbeatHandler.java

    package com.xys.common;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.timeout.IdleStateEvent;
    
    /**
     * @author xiongyongshun
     * @version 1.0
     * @email yongshun1228@gmail.com
     * @created 16/9/18 13:02
     */
    public abstract class CustomHeartbeatHandler extends SimpleChannelInboundHandler<ByteBuf> {
        public static final byte PING_MSG = 1;
        public static final byte PONG_MSG = 2;
        public static final byte CUSTOM_MSG = 3;
        protected String name;
        private int heartbeatCount = 0;
    
        public CustomHeartbeatHandler(String name) {
            this.name = name;
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception {
            if (byteBuf.getByte(4) == PING_MSG) {
                sendPongMsg(context);
            } else if (byteBuf.getByte(4) == PONG_MSG){
                System.out.println(name + " get pong msg from " + context.channel().remoteAddress());
            } else {
                handleData(context, byteBuf);
            }
        }
    
        protected void sendPingMsg(ChannelHandlerContext context) {
            ByteBuf buf = context.alloc().buffer(5);
            buf.writeInt(5);
            buf.writeByte(PING_MSG);
            buf.retain();
            context.writeAndFlush(buf);
            heartbeatCount++;
            System.out.println(name + " sent ping msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount);
        }
    
        private void sendPongMsg(ChannelHandlerContext context) {
            ByteBuf buf = context.alloc().buffer(5);
            buf.writeInt(5);
            buf.writeByte(PONG_MSG);
            context.channel().writeAndFlush(buf);
            heartbeatCount++;
            System.out.println(name + " sent pong msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount);
        }
    
        protected abstract void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf);
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            // IdleStateHandler 所产生的 IdleStateEvent 的处理逻辑.
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent e = (IdleStateEvent) evt;
                switch (e.state()) {
                    case READER_IDLE:
                        handleReaderIdle(ctx);
                        break;
                    case WRITER_IDLE:
                        handleWriterIdle(ctx);
                        break;
                    case ALL_IDLE:
                        handleAllIdle(ctx);
                        break;
                    default:
                        break;
                }
            }
        }
        
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.err.println("88888888888");
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.err.println("---" + ctx.channel().remoteAddress() + " is active---");
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.err.println("---" + ctx.channel().remoteAddress() + " is inactive---");
        }
    
        protected void handleReaderIdle(ChannelHandlerContext ctx) {
            System.err.println("---READER_IDLE---");
        }
    
        protected void handleWriterIdle(ChannelHandlerContext ctx) {
            System.err.println("---WRITER_IDLE---");
        }
    
        protected void handleAllIdle(ChannelHandlerContext ctx) {
            System.err.println("---ALL_IDLE---");
        }
    }

    服务端:

      Server.java

    package com.xys.server;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import io.netty.handler.timeout.IdleStateHandler;
    
    /**
     * @author xiongyongshun
     * @email yongshun1228@gmail.com
     * @version 1.0
     * @created 16/9/18 12:59
     */
    public class Server {
        public static void main(String[] args) {
            NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
            NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap
                        .group(bossGroup, workGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline p = socketChannel.pipeline();
                                p.addLast(new IdleStateHandler(10, 0, 0));
                                p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
                                p.addLast(new ServerHandler());
                            }
                        });
    
                Channel ch = bootstrap.bind(12345).sync().channel();
                ch.closeFuture().sync();
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                bossGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
            }
        }
    }

     ServerHandler.java

    package com.xys.server;
    
    import com.xys.common.CustomHeartbeatHandler;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    
    /**
     * @author xiongyongshun
     * @version 1.0
     * @email yongshun1228@gmail.com
     * @created 16/9/18 13:08
     */
    public class ServerHandler extends CustomHeartbeatHandler {
        public ServerHandler() {
            super("server");
        }
    
        @Override
        protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf buf) {
            byte[] data = new byte[buf.readableBytes() - 5];
            ByteBuf responseBuf = Unpooled.copiedBuffer(buf);
            buf.skipBytes(5);
            buf.readBytes(data);
            String content = new String(data);
            System.out.println(name + " get content: " + content);
            channelHandlerContext.write(responseBuf);
        }
    
        @Override
        protected void handleReaderIdle(ChannelHandlerContext ctx) {
            super.handleReaderIdle(ctx);
            System.err.println("---client " + ctx.channel().remoteAddress().toString() + " reader timeout, close it---");
            ctx.close();
        }
    }

    ChannelMap.java(保存xxx,以便服务端主动向客户端发送信息)

    package com.xys.common;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    
    import java.util.HashMap;
    
    
    public class ChannelMap {
        public static int channelNum=0;
        private static HashMap<String,ChannelHandlerContext> channelHashMap=null;//应改为concurrentHashmap以解决多线程冲突
        
        private static HashMap<Integer,Integer> portMap = null;
        
        public static void addPort(Integer fidHashCode,Integer fid){
            if(portMap==null){
                portMap=new HashMap<>(100);
            }
            portMap.put(fidHashCode,fid);
        }
        
        //map:通过value获取key(fid)
        public static Integer getfidByCode(Integer fidHashCode){
            if(portMap==null||portMap.isEmpty()){
                return null;
            }
            return portMap.get(fidHashCode);
        }
        
    
        public static HashMap<String, ChannelHandlerContext> getChannelHashMap() {
            return channelHashMap;
        }
    
        public static ChannelHandlerContext getChannelByName(String name){
            if(channelHashMap==null||channelHashMap.isEmpty()){
                return null;
            }
            return channelHashMap.get(name);
        }
        
        public static void addChannel(String name,ChannelHandlerContext channel){
            if(channelHashMap==null){
                channelHashMap=new HashMap<>(100);
            }
            channelHashMap.put(name,channel);
            channelNum++;
        }
        
        public static int removeChannelByName(String name){
            if(channelHashMap.containsKey(name)){
                channelHashMap.remove(name);
                return 0;
            }else{
                return 1;
            }
        }
    
    
    }

    使用addChannel()方法将ChannelHandlerContext保存在Map中

    服务端向客户端发送信息:

    public boolean sendInfoToNettyClient(String fid,String info) {
            ChannelFuture cf = null;
            ChannelHandlerContext ctx = ChannelMap.getChannelByName(fid);
            
            if(ctx == null) {
                log.debug("客户端没有被启动或出现异常!");
                return false;
            }
            
            ByteBuf responseBuf = Unpooled.directBuffer(16); 
            responseBuf.writeInt(5+info.getBytes().length);
            responseBuf.writeByte(CustomHeartbeatHandler.CUSTOM_MSG);
            responseBuf.writeBytes(info.getBytes());
            cf = ctx.write(responseBuf);
    
            return cf != null?true:false;
        }
  • 相关阅读:
    按格式读取csv文件内容
    C#分块拷贝大文件
    在 Active Directory 上也有 LINQ 可以用了:LINQ to Active Directory
    CSVDE
    lucene索引查看工具luke和文本提取工具Tika
    Perf工具
    RHEL7.2安装
    Hive on ES
    灰度发布
    LSM树由来、设计思想以及应用到HBase的索引
  • 原文地址:https://www.cnblogs.com/zcjyzh/p/9437113.html
Copyright © 2011-2022 走看看