zoukankan      html  css  js  c++  java
  • [经验] Java 使用 netty 框架, 向 Unity 客户端的 C# 实现通信 [1]

    这是一个较为立体的思路吧

    首先是技术选型:

    前端    : HTML5 + jQuery ,简单暴力, 不解释

    服务端 : Spring Boot + Netty + Redis/Cache

    客户端 : Unity3D + C#

    所要实现的效果为:

    服务端启动后, 开启端口监听, 然后客户端启动, 连接上服务端, 再由前端将数据请求发送到服务端, 服务端再发送到客户端

    为了方便(懒), 所以使用 netty 4.x 作为主要的通讯框架, 由于 5.X好像已经被官方放弃了, 所以我就使用最新版的 

    在 pom.xml 处添加 netty4.x 的依赖

            <!-- netty 通信框架 https://mvnrepository.com/artifact/io.netty/netty-all -->
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.0.39.Final</version>
            </dependency>
    
            <!-- netty websocket 通讯框架依赖 -->
            <dependency>
                <groupId>org.yeauty</groupId>
                <artifactId>netty-websocket-spring-boot-starter</artifactId>
                <version>0.8.0</version>
            </dependency

    老规矩, 从服务端开始, 先创建 netty 的服务端程序

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpServerCodec;
    import io.netty.handler.stream.ChunkedWriteHandler;
    import io.netty.handler.timeout.IdleStateHandler;
    import org.apache.http.client.utils.DateUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeUnit;
    
    /*
     *@Description //TODO NIO 服务端$
     *@Author 吾王剑锋所指 吾等心之所向
     *@Date 2019/8/27 19:18
     */
    public class NettyServer {
        private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);//默认端口
        private Integer defaultPort = 5566;
        public void bind(Integer port) throws Exception {
            //配置服务端的NIO线程组
            EventLoopGroup master = new NioEventLoopGroup();
            EventLoopGroup servant = new NioEventLoopGroup();
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(master, servant).channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                //将响应请求消息解码为 HTTP 消息
                                socketChannel.pipeline().addLast("http-codec", new HttpServerCodec());
                                //将HTTP消息的多个部分构建成一条完整的 HTTP 消息
                                socketChannel.pipeline().addLast("aggregator",new HttpObjectAggregator(2048));
                                //向客户端发送 HTML5 文件
                                socketChannel.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
                                //设置心跳检测
                                socketChannel.pipeline().addLast(new IdleStateHandler(60, 30, 60*30, TimeUnit.SECONDS));
                                //配置通道, 进行业务处理
                                socketChannel.pipeline().addLast(new NettyServerHandler());
                            }
                        })
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        .option(ChannelOption.SO_KEEPALIVE, true) // 2小时无数据激活心跳机制
                        .childHandler(new ServerChannelInitializer());
    
    
                if(null==port) port=this.defaultPort; // 服务器异步创建绑定
    
                ChannelFuture future = bootstrap.bind(port).sync();
                logger.info("服务启动:"+ DateUtils.formatDate(new Date()));
                future.channel().closeFuture().sync();  // 关闭服务器通道
            } finally {
                logger.info("服务停止:"+ DateUtils.formatDate(new Date()));
                // 释放线程池资源
                master.shutdownGracefully();
                servant.shutdownGracefully();
            }
        }
    }
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.LineBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    /*
     *@Description //TODO nio 服务端实现$
     *@Author 吾王剑锋所指 吾等心之所向
     *@Date 2019/8/27 19:20
     */
    public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline pipeline = socketChannel.pipeline();
            pipeline.addLast(new LineBasedFrameDecoder(10010));
            pipeline.addLast( new StringDecoder());
            pipeline.addLast( new StringEncoder());
            pipeline.addLast("handler", new NettyServerHandler());
        }
    }
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.*;
    import io.netty.handler.codec.http.*;
    import io.netty.handler.codec.http.websocketx.*;
    import io.netty.handler.timeout.IdleStateEvent;
    import io.netty.util.AttributeKey;
    import io.netty.util.CharsetUtil;
    
    import org.apache.http.client.utils.DateUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Date;
    
    /*
     *@Description //TODO 服务业务实现$
     *@Author 吾王剑锋所指 吾等心之所向三
     *@Date 2019/8/28 9:50
     */
    public class NettyServerHandler extends SimpleChannelInboundHandler<Object> {
        private static Logger LOGGER = LoggerFactory.getLogger(NettyServerHandler.class);
    
        private static final String URI = "websocket";
        private WebSocketServerHandshaker handshaker;
    
        /**
         * 读取客户端发来的数据
         *
         */
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            String[] data = msg.toString().split("id=");
            if(data != null && data.length > 1) {
                String[] data1 = data[1].split(";");
                String id = data1[0];
                if (NettyServer.map.get(id) != null && NettyServer.map.get(id).equals(ctx)) { //不是第一次连接
                    LOGGER.info("接收数据成功!" + DateUtils.formatDate(new Date()));
                } else { //如果map中没有此ctx 将连接存入map中
                    NettyServer.map.put(id, ctx);
                    LOGGER.info("连接成功,加入map管理连接!"+"mn:" +id+" : "+ctx+""+ DateUtils.formatDate(new Date()));
                }
            }else{
                LOGGER.info("不是监测数据"+ msg.toString()+" : "+ DateUtils.formatDate(new Date()));
            }
            ctx.writeAndFlush("Received your message : " + msg.toString());
        }
    
    
        /**
         *  读取完毕客户端发送过来的数据之后的操作
         * */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            LOGGER.info("服务端接收数据完毕..");
            ctx.channel().write("call ------"); //向客户端发送一条信息
            ctx.channel().flush();
        }
    
        /**
         * 客户端主动断开服务端的链接,关闭流
         * */
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println(ctx.channel().localAddress().toString() + " 通道不活跃!");
            removeChannelMap(ctx);
            ctx.close(); // 关闭流
        }
    
        /**
         * 客户端主动连接服务端 连接成功时向客户端发送一条信息
         *
         * */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            LOGGER.info("RemoteAddress"+ ctx.channel().remoteAddress() + " active !");
            LOGGER.info("msg send active !"+ctx.channel().writeAndFlush("123456"));
            ctx.writeAndFlush("啦啦啦!");super.channelActive(ctx);
        }
    
        /**
         * 异常处理
         *
         * */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            LOGGER.error("连接异常,连接异常:"+ DateUtils.formatDate(new Date())+cause.getMessage(), cause);
            ctx.fireExceptionCaught(cause);
            removeChannelMap(ctx);
            ctx.close();
        }
    
        /**
         *删除map中ChannelHandlerContext
         *
         *  */
        private void removeChannelMap(ChannelHandlerContext ctx){
            for( String key :NettyServer.map.keySet()){
                if( NettyServer.map.get(key)!=null &&  NettyServer.map.get(key).equals( ctx)){
                    NettyServer.map.remove(key);
                }
            }
        }
    
        /**
         * 收发消息处理
         *
         * */
        protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception{
            if(msg instanceof HttpRequest){
                doHandlerHttpRequest(ctx, (HttpRequest) msg);
            }else if(msg instanceof HttpRequest){
                doHandlerWebSocketFrame(ctx, (WebSocketFrame) msg);
            }
        }
    
        /**
         * 进行心跳检测, 保证用户在线
         *
         *
         * */
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if(evt instanceof IdleStateEvent){
                IdleStateEvent stateEvent = (IdleStateEvent) evt;
                PingWebSocketFrame ping = new PingWebSocketFrame();
                switch (stateEvent.state()){
                    case READER_IDLE: //读空闲 服务器端
                        LOGGER.info("{["+ctx.channel().remoteAddress()+"]--->(服务端 read 空闲)}");
                        ctx.writeAndFlush(ping);
                        break;
                    case WRITER_IDLE: //写空闲 服务器端
                        LOGGER.info("{["+ctx.channel().remoteAddress()+"]--->(服务端 write 空闲)}");
                        ctx.writeAndFlush(ping);
                        break;
                    case ALL_IDLE: //读写空闲 服务器端
                        LOGGER.info("{["+ctx.channel().remoteAddress()+"]--->(服务端 读写 空闲)}");
                }
            }
        }
    
        /**
         *  websocket 消息处理
         *
         * */
        protected void doHandlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame msg){
            if(msg instanceof CloseWebSocketFrame){ //判断 msg 是哪一种类型, 分别作出不同的反应
                LOGGER.info("[{---关闭---}]");
                handshaker.close(ctx.channel(), (CloseWebSocketFrame) msg);
                return;
            }
    
            if(msg instanceof PingWebSocketFrame){
                LOGGER.info("[{---ping}]");
                PongWebSocketFrame pong = new PongWebSocketFrame(msg.content().retain());
                ctx.channel().writeAndFlush(pong);
                return;
            }
    
            if(!(msg instanceof TextWebSocketFrame)){
                LOGGER.info("[{!!----不支持二进制-----!!}]");
            }
        }
    
        /**
         * websocket 第一次握手
         *
         * */
        public void doHandlerHttpRequest(ChannelHandlerContext ctx, HttpRequest msg){
            //http 解码失败
            if(!msg.getDecoderResult().isSuccess() || (!"websocket".equals(msg.headers().get("Upgrade")))){
                sendHttpResponse(ctx, (FullHttpRequest) msg,
                        new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
                                HttpResponseStatus.BAD_REQUEST));
            }
    
            //可以获取 msg 的URI来判断
            String uri = msg.getUri();
            if(!uri.substring(1).equals(URI)){
                ctx.close();
            }
            ctx.attr(AttributeKey.valueOf("type")).set(uri);
    
            //通过 URI 获取其他参数验证
            WebSocketServerHandshakerFactory factory =
                    new WebSocketServerHandshakerFactory(
                            "ws://"+msg.headers().get("Host")+"/"+URI+"",
                            null,
                            false);
            handshaker = factory.newHandshaker(msg);
            if(handshaker == null){
                WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
            }
    
            //进行连接
            handshaker.handshake(ctx.channel(), (FullHttpRequest) msg);
    
        }
    
        /**
         * 返回应答给客户端
         *
         * */
        private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res){
            if(res.getStatus().code() != 200){
                ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
                res.content().writeBytes(buf);
                buf.release();
            }
    
            // 如果是非 keep-alive , 关闭连接
            ChannelFuture cf = ctx.channel().writeAndFlush(res);
            if (!HttpHeaders.isKeepAlive(req) || res.getStatus().code() != 200) {
                cf.addListener(ChannelFutureListener.CLOSE);
            }
        }
    
        /**
         * 断开连接
         *
         * */
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception{
            LOGGER.info("handlerRemoved ---->"+ctx.channel());
        }
    
    }

    然后再在系统启动文件的地方开启 启动netty 服务的 线程就可以

    import cn.gzserver.basics.network.netty.NettyServer;
    import cn.gzserver.basics.network.socket.SocketServer;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
    import org.springframework.scheduling.annotation.EnableScheduling;
    
    /*@ComponentScan*/
    @EnableScheduling
    @SpringBootApplication
    @EnableDiscoveryClient
    public class GzserverApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(GzserverApplication.class, args);
    
            //启动 socket 服务, 接收客户端发送连接请求, 并返回数据
            /*SocketServer socketServer = new SocketServer();
            socketServer.start();*/
    
            //开启 netty 服务
            new Thread(() -> {
                try {
                    new NettyServer().bind(5566);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    
    
    }

    然后呢, 客户端的配置基本上没有改变, 可以参考我前面写的一篇博客作为参考就行

    https://www.cnblogs.com/unityworld/p/11345431.html

    但是,还有一些问题, 会在下一篇文章中说明

  • 相关阅读:
    Oracle X$ View:X$KJMSDP
    explore my oracle support using firefox 3.6
    EnterpriseDB Migration 迁移工具使用测试(2)
    What's preconnect.svc in 11g RAC?
    Mysql:语法:注释
    Mysql:命令选项、配置选项、(全局、会话)系统变量、状态变量:总揽
    Mysql:简单“破解”SQLyog Enterprise 812 Trial
    Mysql:函数之一:information functions
    Mysql:语法:虚拟表DUAL
    VC++.Net2005的一些常识(转)
  • 原文地址:https://www.cnblogs.com/unityworld/p/11425180.html
Copyright © 2011-2022 走看看