zoukankan      html  css  js  c++  java
  • 本人对于netty框架的一些理解,怎么与网站上的websock建立连接

    在Netty的里面有一个Boss,他开了一家公司(开启一个服务端口)对外提供业务服务,它手下有一群做事情的workers。Boss一直对外宣传自己公司提供的业务,并且接受(accept)有需要的客户(client),当一位客户找到Boss说需要他公司提供的业务,Boss便会为这位客户安排一个worker,这个worker全程为这位客户服务(read/write)。如果公司业务繁忙,一个worker可能会为多个客户进行服务。这就是Netty里面Boss和worker之间的关系。下面看看Netty是如何让Boss和Worker进行协助的。

    
    
    private EventLoopGroup boss = new NioEventLoopGroup();
    private EventLoopGroup work = new NioEventLoopGroup();
    ServerBootstrap bootstrap = new ServerBootstrap()
            .group(boss, work)
            .channel(NioServerSocketChannel.class)
            .localAddress(new InetSocketAddress(nettyPort))
            //保持长连接
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            .childHandler(new HeartbeatInitializer());
    ChannelFuture future = bootstrap.bind().sync();
    if (future.isSuccess()) {
        log.info("启动 Netty 成功");
    
    
    }

    上诉代码初始化了一条netty的服务,那么如何初始话的写在类HeartbeatInitializer里面

    public class HeartbeatInitializer extends ChannelInitializer<Channel> {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline()
                    //五秒没有收到消息 将IdleStateHandler 添加到 ChannelPipeline 拦截器中
                    .addLast(new IdleStateHandler(5, 0, 0))
    
                    // HttpServerCodec:将请求和应答消息解码为HTTP消息
                    .addLast("http-codec",new HttpServerCodec())
                    // HttpObjectAggregator:将HTTP消息的多个部分合成一条完整的HTTP消息
                    .addLast("aggregator",new HttpObjectAggregator(65536))
                    // ChunkedWriteHandler:向客户端发送HTML5文件
                    .addLast("http-chunked",new ChunkedWriteHandler())
    
                    .addLast(new HeartBeatSimpleHandle());
        }
    }

    上述文件设置了 拦截器,解码和解码合并,还有响应,最后一个new HeartBeatSimpleHandle()用来处理请求

    public class HeartBeatSimpleHandle extends SimpleChannelInboundHandler<Object> {
    
        private WebSocketServerHandshaker handShaker;
    
        /**
         * 取消绑定
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    
            NettySocketHolder.remove((NioSocketChannel) ctx.channel());
        }
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            super.userEventTriggered(ctx, evt);
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            // 传统的HTTP接入
            if (msg instanceof FullHttpRequest) {
                FullHttpRequest req = (FullHttpRequest) msg;
                handleHttpRequest(ctx, req);
                //获取url后置参数
                String uri=req.uri();
                QueryStringDecoder queryStringDecoder = new QueryStringDecoder(uri);
                Map<String, List<String>> parameters = queryStringDecoder.parameters();
                Integer userId = Integer.valueOf(parameters.get("userId").get(0));
                // 存储当前登录ctx
                if(NettySocketHolder.get((long)userId) == null){
                    NettySocketHolder.put((long)userId, (NioSocketChannel) ctx.channel());
                }
                // WebSocket接入
            } else if (msg instanceof WebSocketFrame) {
                if ("live".equals(ctx.channel().attr(AttributeKey.valueOf("type")).get())) {
                    handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
                }
                log.info("收到msg={},id={}", msg);
                //保存客户端与 Channel 之间的关系
            }
        }
    
        private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
            // 如果HTTP解码失败,返回HTTP异常
            if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
                sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
                return;
            }
            //获取url后置参数
            HttpMethod method=req.method();
            String uri=req.uri();
            QueryStringDecoder queryStringDecoder = new QueryStringDecoder(uri);
            Map<String, List<String>> parameters = queryStringDecoder.parameters();
            if(method==HttpMethod.GET&&"/websocket".equals(uri)){
                //...处理
                ctx.channel().attr(AttributeKey.valueOf("type")).set("live");
            }
            // 构造握手响应返回,本机测试
            WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                    "ws://"+req.headers().get(HttpHeaderNames.HOST)+uri, null, false);
            handShaker = wsFactory.newHandshaker(req);
            if (handShaker == null) {
                WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
            } else {
                handShaker.handshake(ctx.channel(), req);
            }
        }
    
        private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
            // 返回应答给客户端
            if (res.status().code() != 200) {
                ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
                res.content().writeBytes(buf);
                buf.release();
            }
            // 如果是非Keep-Alive,关闭连接
            ChannelFuture f = ctx.channel().writeAndFlush(res);
            if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {
                f.addListener(ChannelFutureListener.CLOSE);
            }
        }
    
        private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
            // 判断是否关闭链路的指令
            if (frame instanceof CloseWebSocketFrame) {
                handShaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            }
        }

    HeartBeatSimpleHandle 用与处理管道(Channel)的读取工作,将管道储存在NettySocketHolder里面,用的时候取出。WebSocketServerHandshaker用于响应客户端的响应

      var websocket;
    
        $(function(){
            $.ajax({
                type: "POST",
                url: "/login",
                data: {
                    userId: 2,
                    account: "135541",
                    userName: "123"
                },
                contentType: "application/x-www-form-urlencoded; charset=utf-8",
                dataType: "json",
                success: function (result) {
                    websoketCannel(result.data.userId, result.data.account, result.data.userName);
                }
            });
        });
    
        function websoketCannel(userId, account, userName){
    
            //如果浏览器支持WebSocket
            if(window.WebSocket){
                websocket = new WebSocket("ws://localhost:1212/websocket?userId="+ userId +"&account="+ account +"&userName="+ userName +"");  //获得WebSocket对象
    
                //当有消息过来的时候触发
                websocket.onmessage = function(event){
                    var data = JSON.parse(event.data);
                    $("#message").text(data.msg);
                };
    
                //连接关闭的时候触发
                websocket.onclose = function(event){
                    console.log("断开连接");
                };
    
                //连接打开的时候触发
                websocket.onopen = function(event){
                    console.log("建立连接");
                }
            }else{
                alert("浏览器不支持WebSocket");
            }
        }
    
        function sendMsg(msg) { //发送消息
            if(window.WebSocket){
                if(websocket.readyState == WebSocket.OPEN) { //如果WebSocket是打开状态
                    websocket.send(msg); //send()发送消息
                }
            }else{
                return;
            }
        }

    上面代码是客户端如何连netty

  • 相关阅读:
    数据库DQL(Data Query Language)语言学习之三:排序查询
    数据库DQL(Data Query Language)语言学习之二:条件查询
    MySQL中的通配符
    SQL查询文档网站
    python之特殊方法
    java之静态函数和静态变量
    java之类和对象
    python之类的继承
    python的面向对象编程
    python之模块(在命令行当中使用pip install 模块来进行模块的安装)
  • 原文地址:https://www.cnblogs.com/kangniuniu/p/11107768.html
Copyright © 2011-2022 走看看