zoukankan      html  css  js  c++  java
  • netty的ChannelPipeline执行顺序对inBound和outBound执行器造成的影响

      进行这项实验之前,先读了xbmchina的简书文章,感谢这位大神提供的关于channelPipeline和channelHandler文章:

      【Netty】ChannelPipeline和ChannelHandler(一)

      【Netty】ChannelHandler的添加和删除(二)

      【Netty】inBound和outBound事件的传播过程

      之前想以leonzm的websocket_demo项目为基础,写netty4版本的聊天室,但是发现netty4的函数不一样,messageReceived(建立链接/接收数据包)和close(断开链接)不能覆写,研究了下handler的生命周期。知道channelRead0可以建立链接,并接收已建立链接的客户端的数据包;当隧道处于channelInactived阶段时,表明数据隧道(链接)要断开了,就要进入channelUnregistered阶段,这时就可以在上面执行链接相关数据清除工作;隧道的处理器ChannelHandler也有生命周期,handlerRemoved时也可以执行类似操作。

      netty的inbound和outbound的区别:除了inbound事件为被动触发,在某些情况发生时自动触发,outbound为主动触发,在需要主动执行某些操作时触发以外,outBound单独用不能接收到websocket客户端的信息(这是向外主动发信息的handler,接收信息要inbound来),outBound这个跟适合在pipeline流水线上嵌入,做AOP(切面编程)。

      开始执行channelPipeline流水线程序比较:

      Lanucher.java:(开启netty服务的主函数)

     1 package com.company.lanucher;
     2 
     3 import com.company.server.ReversedWebSocketServer;
     4 import com.company.server.WebSocketServer;
     5 
     6 public class Lanucher {
     7 
     8     public static void main(String[] args) throws Exception {
     9         // 启动WebSocket,如果想开启另一个服务器,注释掉Reversed,再解除WebSocketServer的注释即可
    10         //new WebSocketServer().run(WebSocketServer.WEBSOCKET_PORT);
    11         new ReversedWebSocketServer().run(ReversedWebSocketServer.WEBSOCKET_PORT);
    12     }
    13     
    14 }
    Lanucher.java

      WebSocketServer.java:(流水线先执行inBoundHandler再执行OutBoundAdapter)

     1 package com.company.server;
     2 
     3 import org.apache.log4j.Logger;
     4 
     5 import io.netty.bootstrap.ServerBootstrap;
     6 import io.netty.channel.Channel;
     7 import io.netty.channel.ChannelInitializer;
     8 import io.netty.channel.ChannelPipeline;
     9 import io.netty.channel.EventLoopGroup;
    10 import io.netty.channel.nio.NioEventLoopGroup;
    11 import io.netty.channel.socket.nio.NioServerSocketChannel;
    12 import io.netty.handler.codec.http.HttpObjectAggregator;
    13 import io.netty.handler.codec.http.HttpServerCodec;
    14 import io.netty.handler.stream.ChunkedWriteHandler;
    15 
    16 /**
    17  * WebSocket服务
    18  *
    19  */
    20 public class WebSocketServer {
    21     private static final Logger LOG = Logger.getLogger(WebSocketServer.class);
    22     
    23     // websocket端口
    24     public static final int WEBSOCKET_PORT = 9090;
    25 
    26     public void run(int port) throws Exception {
    27         EventLoopGroup bossGroup = new NioEventLoopGroup();
    28         EventLoopGroup workerGroup = new NioEventLoopGroup();
    29         try {
    30             ServerBootstrap b = new ServerBootstrap();
    31             b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<Channel>() {
    32 
    33                 @Override
    34                 protected void initChannel(Channel channel) throws Exception {
    35                     ChannelPipeline pipeline = channel.pipeline();
    36                     pipeline.addLast("http-codec", new HttpServerCodec()); // Http消息编码解码
    37                     pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); // Http消息组装
    38                     pipeline.addLast("http-chunked", new ChunkedWriteHandler()); // WebSocket通信支持
    39                     pipeline.addLast("adapter", new FunWebSocketServerHandler()); // WebSocket服务端Handler的前置拦截器
    40                     pipeline.addLast("handler", new BananaWebSocketServerHandler()); // WebSocket服务端Handler
    41                 }
    42             });
    43             
    44             Channel channel = b.bind(port).sync().channel();
    45             LOG.info("WebSocket 已经启动,端口:" + port + ".");
    46             channel.closeFuture().sync();
    47         } finally {
    48             bossGroup.shutdownGracefully();
    49             workerGroup.shutdownGracefully();
    50         }
    51     }
    52 }
    WebSocketServer.java

      ReversedWebSocketServer.java:(流水线先执行OutBoundAdapter再执行inBoundHandler)

     1 package com.company.server;
     2 
     3 import org.apache.log4j.Logger;
     4 
     5 import io.netty.bootstrap.ServerBootstrap;
     6 import io.netty.channel.Channel;
     7 import io.netty.channel.ChannelInitializer;
     8 import io.netty.channel.ChannelPipeline;
     9 import io.netty.channel.EventLoopGroup;
    10 import io.netty.channel.nio.NioEventLoopGroup;
    11 import io.netty.channel.socket.nio.NioServerSocketChannel;
    12 import io.netty.handler.codec.http.HttpObjectAggregator;
    13 import io.netty.handler.codec.http.HttpServerCodec;
    14 import io.netty.handler.stream.ChunkedWriteHandler;
    15 
    16 public class ReversedWebSocketServer {
    17     private static final Logger LOG = Logger.getLogger(WebSocketServer.class);
    18     
    19     // websocket端口
    20     public static final int WEBSOCKET_PORT = 9090;
    21     public static final int FUN_WEBSOCKET_PORT = 9091;
    22 
    23     public void run(int port) throws Exception {
    24         EventLoopGroup bossGroup = new NioEventLoopGroup();
    25         EventLoopGroup workerGroup = new NioEventLoopGroup();
    26         try {
    27             ServerBootstrap b = new ServerBootstrap();
    28             b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<Channel>() {
    29 
    30                 @Override
    31                 protected void initChannel(Channel channel) throws Exception {
    32                     ChannelPipeline pipeline = channel.pipeline();
    33                     pipeline.addLast("http-codec", new HttpServerCodec()); // Http消息编码解码
    34                     pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); // Http消息组装
    35                     pipeline.addLast("http-chunked", new ChunkedWriteHandler()); // WebSocket通信支持
    36                     pipeline.addLast("handler", new BananaWebSocketServerHandler()); // WebSocket服务端Handler
    37                     pipeline.addLast("adapter", new FunWebSocketServerHandler()); // WebSocket服务端Handler
    38                 }
    39             });
    40             
    41             Channel channel = b.bind(port).sync().channel();
    42             LOG.info("WebSocket 已经启动,端口:" + port + ".");
    43             channel.closeFuture().sync();
    44         } finally {
    45             bossGroup.shutdownGracefully();
    46             workerGroup.shutdownGracefully();
    47         }
    48     }
    49     
    50 }
    ReversedWebSocketServer.java

      BananaWebSocketServerHandler.java:(inBoundHandler,处理从客户端接收的请求)

      1 package com.company.server;
      2 
      3 import io.netty.buffer.ByteBuf;
      4 import io.netty.buffer.Unpooled;
      5 import io.netty.channel.ChannelFuture;
      6 import io.netty.channel.ChannelFutureListener;
      7 import io.netty.channel.ChannelHandlerContext;
      8 import io.netty.channel.ChannelPromise;
      9 import io.netty.channel.SimpleChannelInboundHandler;
     10 import io.netty.handler.codec.http.DefaultFullHttpResponse;
     11 import io.netty.handler.codec.http.FullHttpRequest;
     12 import io.netty.handler.codec.http.FullHttpResponse;
     13 import io.netty.handler.codec.http.HttpHeaders;
     14 import io.netty.handler.codec.http.HttpResponseStatus;
     15 import io.netty.handler.codec.http.HttpVersion;
     16 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
     17 import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
     18 import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
     19 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
     20 import io.netty.handler.codec.http.websocketx.WebSocketFrame;
     21 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
     22 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
     23 import io.netty.util.CharsetUtil;
     24 
     25 import org.apache.log4j.Logger;
     26 
     27 import com.company.serviceimpl.BananaService;
     28 import com.company.util.BufToString;
     29 import com.company.util.CODE;
     30 import com.company.util.Request;
     31 import com.company.util.Response;
     32 import com.google.common.base.Strings;
     33 import com.google.gson.JsonSyntaxException;
     34 
     35 
     36 /**
     37  * WebSocket服务端Handler
     38  *
     39  */
     40 public class BananaWebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
     41     private static final Logger LOG = Logger.getLogger(BananaWebSocketServerHandler.class.getName());
     42     
     43     private WebSocketServerHandshaker handshaker;
     44     private ChannelHandlerContext ctx;
     45     private String sessionId;
     46     private boolean isLog = true;
     47     
     48     public BananaWebSocketServerHandler() {
     49         super();
     50     }
     51     
     52     public BananaWebSocketServerHandler(boolean isLog) {
     53         this();
     54         this.isLog = isLog;
     55     }
     56 
     57     //netty 5的覆写函数,netty4中用channelRead0代替
     58     public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
     59         if(this.isLog) {
     60             System.out.print("channel MessageReceived = = " + ctx.name());
     61         }
     62         if (msg instanceof FullHttpRequest) { // 传统的HTTP接入
     63             FullHttpRequest mymsg = (FullHttpRequest) msg;
     64             System.out.println(" with http request : " + BufToString.convertByteBufToString(mymsg.content()));
     65             handleHttpRequest(ctx, mymsg);
     66         } else if (msg instanceof WebSocketFrame) { // WebSocket接入
     67             WebSocketFrame mymsg = (WebSocketFrame) msg;
     68             System.out.println(" with socket request : " + BufToString.convertByteBufToString(mymsg.content()));
     69             handleWebSocketFrame(ctx, mymsg);
     70         }
     71     }
     72     
     73     @Override
     74     public void handlerAdded(ChannelHandlerContext ctx) throws Exception{
     75         System.out.println("channel handlerAdded = = " + ctx.name());
     76     }
     77     
     78     @Override
     79     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception{
     80         System.out.println("channel handlerRemoved = = " + ctx.name());
     81     }
     82     
     83     @Override
     84     protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
     85         if(this.isLog) {
     86             System.out.print("channel Read0 = = " + ctx.name());
     87         }
     88         if (msg instanceof FullHttpRequest) { // 传统的HTTP接入
     89             FullHttpRequest mymsg = (FullHttpRequest) msg;
     90             System.out.println(" with http request : " + BufToString.convertByteBufToString(mymsg.content()));
     91             handleHttpRequest(ctx, mymsg);
     92         } else if (msg instanceof WebSocketFrame) { // WebSocket接入
     93             WebSocketFrame mymsg = (WebSocketFrame) msg;
     94             System.out.println(" with socket request : " + BufToString.convertByteBufToString(mymsg.content()));
     95             handleWebSocketFrame(ctx, mymsg);
     96         }
     97     }
     98     
     99     @Override
    100     public void channelInactive(ChannelHandlerContext ctx) {
    101         if(this.isLog) {
    102             System.out.println("channel Inactive = = " + ctx.name());
    103         }
    104         try {
    105             this.close(ctx, null);
    106         } catch (Exception e) {
    107             e.printStackTrace();
    108         }
    109     }
    110     
    111     @Override
    112     public void channelUnregistered(ChannelHandlerContext ctx) {
    113         if(this.isLog) {
    114             System.out.println("channel Unregistered = = " + ctx.name());
    115         }
    116     }
    117 
    118     @Override
    119     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    120         ctx.flush();
    121         System.out.println("channel Flush = = " + ctx.name());
    122     }
    123     
    124     @Override
    125     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    126 
    127         ctx.close();
    128         if(this.isLog) {
    129             System.err.println("channel exceptionCaught = = " + ctx.name());
    130             cause.printStackTrace();
    131         }
    132         BananaService.logout(sessionId); // 注销
    133         BananaService.notifyDownline(sessionId); // 通知有人下线
    134     }
    135 
    136     //netty 5的覆写函数,netty4中用channelInactive代替
    137     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
    138         if(this.isLog) {
    139             System.out.println("channel close = = " + ctx.name());
    140         }
    141         BananaService.logout(sessionId); // 注销
    142         BananaService.notifyDownline(sessionId); // 通知有人下线
    143         ctx.close();
    144     }
    145 
    146     /**
    147      * 处理Http请求,完成WebSocket握手<br/>
    148      * 注意:WebSocket连接第一次请求使用的是Http
    149      * @param ctx
    150      * @param request
    151      * @throws Exception
    152      */
    153     private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
    154         // 如果HTTP解码失败,返回HHTP异常
    155         if (!request.getDecoderResult().isSuccess() || (!"websocket".equals(request.headers().get("Upgrade")))) {
    156             sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
    157             return;
    158         }
    159 
    160         // 正常WebSocket的Http连接请求,构造握手响应返回
    161         WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://" + request.headers().get(HttpHeaders.Names.HOST), null, false);
    162         handshaker = wsFactory.newHandshaker(request);
    163         if (handshaker == null) { // 无法处理的websocket版本
    164             WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
    165         } else { // 向客户端发送websocket握手,完成握手
    166             handshaker.handshake(ctx.channel(), request);
    167             // 记录管道处理上下文,便于服务器推送数据到客户端
    168             this.ctx = ctx;
    169         }
    170     }
    171 
    172     /**
    173      * 处理Socket请求
    174      * @param ctx
    175      * @param frame
    176      * @throws Exception 
    177      */
    178     private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
    179         // 判断是否是关闭链路的指令
    180         if (frame instanceof CloseWebSocketFrame) {
    181             handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
    182             return;
    183         }
    184         // 判断是否是Ping消息
    185         if (frame instanceof PingWebSocketFrame) {
    186             ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
    187             return;
    188         }
    189         // 当前只支持文本消息,不支持二进制消息
    190         if (!(frame instanceof TextWebSocketFrame)) {
    191             throw new UnsupportedOperationException("当前只支持文本消息,不支持二进制消息");
    192         }
    193         
    194         // 处理来自客户端的WebSocket请求
    195         try {
    196             /*
    197             if(this.isLog) {
    198                 System.out.println("handleWebSocketFrame-=-=-" + ((TextWebSocketFrame)frame).text());
    199             }
    200             */
    201             Request request = Request.create(((TextWebSocketFrame)frame).text());
    202             Response response = new Response();
    203             response.setServiceId(request.getServiceId());
    204             if (CODE.online.code.intValue() == request.getServiceId()) { // 客户端注册
    205                 String requestId = request.getRequestId();
    206                 if (Strings.isNullOrEmpty(requestId)) {
    207                     response.setIsSucc(false).setMessage("requestId不能为空");
    208                     return;
    209                 } else if (Strings.isNullOrEmpty(request.getName())) {
    210                     response.setIsSucc(false).setMessage("name不能为空");
    211                     return;
    212                 } else if (BananaService.bananaWatchMap.containsKey(requestId)) {
    213                     response.setIsSucc(false).setMessage("您已经注册了,不能重复注册");
    214                     return;
    215                 }
    216                 if (!BananaService.register(requestId, new BananaService(ctx, request.getName()))) {
    217                     response.setIsSucc(false).setMessage("注册失败");
    218                 } else {
    219                     response.setIsSucc(true).setMessage("注册成功");
    220                     
    221                     BananaService.bananaWatchMap.forEach((reqId, callBack) -> {
    222                         response.getHadOnline().put(reqId, ((BananaService)callBack).getName()); // 将已经上线的人员返回
    223                         
    224                         if (!reqId.equals(requestId)) {
    225                             Request serviceRequest = new Request();
    226                             serviceRequest.setServiceId(CODE.online.code);
    227                             serviceRequest.setRequestId(requestId);
    228                             serviceRequest.setName(request.getName());
    229                             try {
    230                                 callBack.send(serviceRequest); // 通知有人上线
    231                             } catch (Exception e) {
    232                                 LOG.warn("回调发送消息给客户端异常", e);
    233                             }
    234                         }
    235                     });
    236                 }
    237                 sendWebSocket(response.toJson());
    238                 this.sessionId = requestId; // 记录会话id,当页面刷新或浏览器关闭时,注销掉此链路
    239             } else if (CODE.send_message.code.intValue() == request.getServiceId()) { // 客户端发送消息到聊天群
    240                 String requestId = request.getRequestId();
    241                 if (Strings.isNullOrEmpty(requestId)) {
    242                     response.setIsSucc(false).setMessage("requestId不能为空");
    243                 } else if (Strings.isNullOrEmpty(request.getName())) {
    244                     response.setIsSucc(false).setMessage("name不能为空");
    245                 } else if (Strings.isNullOrEmpty(request.getMessage())) {
    246                     response.setIsSucc(false).setMessage("message不能为空");
    247                 } else {
    248                     response.setIsSucc(true).setMessage("发送消息成功");
    249                     
    250                     BananaService.bananaWatchMap.forEach((reqId, callBack) -> { // 将消息发送到所有机器
    251                         Request serviceRequest = new Request();
    252                         serviceRequest.setServiceId(CODE.receive_message.code);
    253                         serviceRequest.setRequestId(requestId);
    254                         serviceRequest.setName(request.getName());
    255                         serviceRequest.setMessage(request.getMessage());
    256                         try {
    257                             callBack.send(serviceRequest);
    258                         } catch (Exception e) {
    259                             LOG.warn("回调发送消息给客户端异常", e);
    260                         }
    261                     });
    262                 }
    263                 sendWebSocket(response.toJson());
    264             } else if (CODE.downline.code.intValue() == request.getServiceId()) { // 客户端下线
    265                 String requestId = request.getRequestId();
    266                 if (Strings.isNullOrEmpty(requestId)) {
    267                     sendWebSocket(response.setIsSucc(false).setMessage("requestId不能为空").toJson());
    268                 } else {
    269                     BananaService.logout(requestId);
    270                     response.setIsSucc(true).setMessage("下线成功");
    271                     
    272                     BananaService.notifyDownline(requestId); // 通知有人下线
    273                     
    274                     sendWebSocket(response.toJson());
    275                 }
    276                 
    277             } else {
    278                 sendWebSocket(response.setIsSucc(false).setMessage("未知请求").toJson());
    279             }
    280         } catch (JsonSyntaxException e1) {
    281             LOG.warn("Json解析异常", e1);
    282             System.err.println("Json解析异常");
    283             e1.printStackTrace();
    284         } catch (Exception e2) {
    285             LOG.error("处理Socket请求异常", e2);
    286             System.err.println("处理Socket请求异常");
    287             e2.printStackTrace();
    288         }
    289     }
    290 
    291     /**
    292      * Http返回
    293      * @param ctx
    294      * @param request
    295      * @param response
    296      */
    297     private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, FullHttpResponse response) {
    298         // 返回应答给客户端
    299         if (response.getStatus().code() != 200) {
    300             ByteBuf buf = Unpooled.copiedBuffer(response.getStatus().toString(), CharsetUtil.UTF_8);
    301             response.content().writeBytes(buf);
    302             buf.release();
    303             HttpHeaders.setContentLength(response, response.content().readableBytes());
    304         }
    305 
    306         // 如果是非Keep-Alive,关闭连接
    307         ChannelFuture f = ctx.channel().writeAndFlush(response);
    308         if (!HttpHeaders.isKeepAlive(request) || response.getStatus().code() != 200) {
    309             f.addListener(ChannelFutureListener.CLOSE);
    310         }
    311     }
    312     
    313     /**
    314      * WebSocket返回
    315      * @param ctx
    316      * @param req
    317      * @param res
    318      */
    319     public void sendWebSocket(String msg) throws Exception {
    320         if (this.handshaker == null || this.ctx == null || this.ctx.isRemoved()) {
    321             throw new Exception("尚未握手成功,无法向客户端发送WebSocket消息");
    322         }
    323         this.ctx.channel().write(new TextWebSocketFrame(msg));
    324         this.ctx.flush();
    325     }
    326 
    327 }
    BananaWebSocketServerHandler.java

      FunWebSocketServerHandler.java:(outBoundAdapter,处理从服务器发出的响应)

     1 package com.company.server;
     2 
     3 import java.net.SocketAddress;
     4 
     5 import com.company.util.BufToString;
     6 
     7 import io.netty.channel.ChannelHandlerContext;
     8 import io.netty.channel.ChannelOutboundHandlerAdapter;
     9 import io.netty.channel.ChannelPromise;
    10 import io.netty.handler.codec.http.DefaultFullHttpResponse;
    11 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
    12 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    13 
    14 public class FunWebSocketServerHandler extends ChannelOutboundHandlerAdapter{
    15     
    16     @Override
    17     public void read(ChannelHandlerContext ctx) throws Exception {  
    18         ChannelHandlerContext readRes = ctx.read();  
    19         System.out.println(ctx.name() + " is read in " + readRes.toString());
    20     }
    21     
    22     @Override
    23     public void handlerAdded(ChannelHandlerContext ctx) throws Exception{
    24         System.out.println(ctx.name() + " handlerAdded = = " + ctx.name());
    25     }
    26     
    27     @Override
    28     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception{
    29         System.out.println(ctx.name() + " handlerRemoved = = " + ctx.name());
    30     }
    31     
    32     @Override
    33     public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,  
    34             ChannelPromise promise) throws Exception {  
    35         ctx.bind(localAddress, promise);  
    36         System.out.println(ctx.name() + " is bind in " + localAddress.toString() + " in " + promise.toString());
    37     }
    38     @Override
    39     public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,  
    40             SocketAddress localAddress, ChannelPromise promise) throws Exception {  
    41         ctx.connect(remoteAddress, localAddress, promise);  
    42         System.out.println(ctx.name() + " is connect in " + localAddress.toString() + " in client "  + remoteAddress.toString() + " in " + promise.toString());
    43     }   
    44     @Override
    45     public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)  
    46             throws Exception {  
    47         ctx.disconnect(promise);  
    48         System.out.println(ctx.name() + " is disconnect in " + promise.toString());
    49     }   
    50     @Override
    51     public void close(ChannelHandlerContext ctx, ChannelPromise promise)  
    52             throws Exception {  
    53         ctx.close(promise);  
    54         System.out.println(ctx.name() + " is close in " + promise.toString());
    55     }   
    56     @Override
    57     public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {  
    58         ctx.deregister(promise); 
    59         System.out.println(ctx.name() + " is deregister in " + promise.toString());
    60     }   
    61 
    62     @Override
    63     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {  
    64         ctx.write(msg, promise);  
    65         System.out.print(ctx.name() + " is write in " + promise.toString());
    66         if(msg instanceof DefaultFullHttpResponse) {
    67             System.out.println(" with message : " + BufToString.convertByteBufToString(((DefaultFullHttpResponse)msg).content()));
    68         }
    69         else if(msg instanceof TextWebSocketFrame) {
    70             System.out.println(" with socket message : " + ((TextWebSocketFrame)msg).text());
    71         }
    72         else if(msg instanceof CloseWebSocketFrame) {
    73             System.out.println(" close reason : " + ((CloseWebSocketFrame)msg).reasonText());
    74         }
    75         else {
    76             System.out.println(" with message : " + msg.getClass());
    77         }
    78     }   
    79     @Override
    80     public void flush(ChannelHandlerContext ctx) throws Exception {  
    81         ctx.flush();  
    82         System.out.println(ctx.name() + " is flush");
    83     }  
    84 }
    FunWebSocketServerHandler.java

      banana.html:(聊天室前端)

      1 <!DOCTYPE html>
      2 <html>
      3 <head>
      4 <meta charset="UTF-8">
      5 <title>Netty WebSocket 聊天实例</title>
      6 </head>
      7 <script src="https://cdn.staticfile.org/jquery/1.10.2/jquery.min.js" type="text/javascript"></script>
      8 <script src="map.js" type="text/javascript"></script>
      9 <script type="text/javascript">
     10 $(document).ready(function() {
     11     var uuid = guid(); // uuid在一个会话唯一
     12     var nameOnline = ''; // 上线姓名
     13     var onlineName = new Map(); // 已上线人员, <requestId, name>
     14     
     15     $("#name").attr("disabled","disabled");
     16     $("#onlineBtn").attr("disabled","disabled");
     17     $("#downlineBtn").attr("disabled","disabled");
     18     
     19     $("#banana").hide();
     20 
     21     // 初始化websocket
     22     var socket;
     23     if (!window.WebSocket) {
     24         window.WebSocket = window.MozWebSocket;
     25     }
     26     if (window.WebSocket) {
     27         socket = new WebSocket("ws://localhost:9090/");
     28         socket.onmessage = function(event) {
     29             console.log("收到服务器消息:" + event.data);
     30             if (event.data.indexOf("isSucc") != -1) {// 这里需要判断是客户端请求服务端返回后的消息(response)
     31                 var response = JSON.parse(event.data);
     32                 if (response != undefined && response != null) {
     33                     if (response.serviceId == 1001) { // 上线
     34                         if (response.isSucc) {
     35                             // 上线成功,初始化已上线人员
     36                             onlineName.clear();
     37                             $("#showOnlineNames").empty();
     38                             for (var reqId in response.hadOnline) {
     39                                 onlineName.put(reqId, response.hadOnline[reqId]);
     40                             }
     41                             initOnline();
     42                             
     43                             $("#name").attr("disabled","disabled");
     44                             $("#onlineBtn").attr("disabled","disabled");
     45                             $("#downlineBtn").removeAttr("disabled");
     46                             $("#banana").show();
     47                         } else {
     48                             alert("上线失败");
     49                         }
     50                     } else if (response.serviceId == 1004) {
     51                         if (response.isSucc) {
     52                             onlineName.clear();
     53                             $("#showBanana").empty();
     54                             $("#showOnlineNames").empty();
     55                             $("#name").removeAttr("disabled");
     56                             $("#onlineBtn").removeAttr("disabled");
     57                             $("#downlineBtn").attr("disabled","disabled");
     58                             $("#banana").hide();
     59                         } else {
     60                             alert("下线失败");
     61                         }
     62                     }
     63                 }
     64             } else {// 还是服务端向客户端的请求(request)
     65                 var request = JSON.parse(event.data);
     66                 if (request != undefined && request != null) {
     67                     if (request.serviceId == 1001 || request.serviceId == 1004) { // 有人上线/下线
     68                         if (request.serviceId == 1001) {
     69                             onlineName.put(request.requestId, request.name);
     70                         }
     71                         if (request.serviceId == 1004) {
     72                             onlineName.removeByKey(request.requestId);
     73                         }
     74                         
     75                         initOnline();
     76                     } else if (request.serviceId == 1003) { // 有人发消息
     77                         appendBanana(request.name, request.message);
     78                     }
     79                 }
     80             }
     81         };
     82         socket.onopen = function(event) {
     83             $("#name").removeAttr("disabled");
     84             $("#onlineBtn").removeAttr("disabled");
     85             console.log("已连接服务器");
     86         };
     87         socket.onclose = function(event) { // WebSocket 关闭
     88             console.log("WebSocket已经关闭!");
     89         };
     90         socket.onerror = function(event) {
     91             console.log("WebSocket异常!");
     92         };
     93     } else {
     94         alert("抱歉,您的浏览器不支持WebSocket协议!");
     95     }
     96     
     97     // WebSocket发送请求
     98     function send(message) {
     99         if (!window.WebSocket) { return; }
    100         if (socket.readyState == WebSocket.OPEN) {
    101             socket.send(message);
    102         } else {
    103             console.log("WebSocket连接没有建立成功!");
    104             alert("您还未连接上服务器,请刷新页面重试");
    105         }
    106     }
    107     
    108     // 刷新上线人员
    109     function initOnline() {
    110         $("#showOnlineNames").empty();
    111         for (var i=0;i<onlineName.size();i++) {
    112             $("#showOnlineNames").append('<tr><td>' + (i+1) + '</td>' +
    113             '<td>' + onlineName.element(i).value + '</td>' +
    114             '</tr>');
    115         }
    116     }
    117     // 追加聊天信息
    118     function appendBanana(name, message) {
    119         $("#showBanana").append('<tr><td>' + name + ': ' + message + '</td></tr>');
    120     }
    121     
    122     $("#onlineBtn").bind("click", function() {
    123         var name = $("#name").val();
    124         if (name == null || name == '') {
    125             alert("请输入您的尊姓大名");
    126             return;
    127         }
    128 
    129         nameOnline = name;
    130         // 上线
    131         send(JSON.stringify({"requestId":uuid, "serviceId":1001, "name":name}));
    132     });
    133     
    134     $("#downlineBtn").bind("click", function() {
    135         // 下线
    136         send(JSON.stringify({"requestId":uuid, "serviceId":1004}));
    137     });
    138     
    139     $("#sendBtn").bind("click", function() {
    140         var message = $("#messageInput").val();
    141         if (message == null || message == '') {
    142             alert("请输入您的聊天信息");
    143             return;
    144         }
    145         
    146         // 发送聊天消息
    147         send(JSON.stringify({"requestId":uuid, "serviceId":1002, "name":nameOnline, "message":message}));
    148         $("#messageInput").val("");
    149     });
    150     
    151 });
    152 
    153 function guid() {
    154     function S4() {
    155        return (((1+Math.random())*0x10000)|0).toString(16).substring(1);
    156     }
    157     return (S4()+S4()+"-"+S4()+"-"+S4()+"-"+S4()+"-"+S4()+S4()+S4());
    158 }
    159 </script>
    160 <body>
    161   <h1>Netty WebSocket 聊天实例</h1>
    162   <input type="text" id="name" value="佚名" placeholder="姓名" />
    163   <input type="button" id="onlineBtn" value="上线" />
    164   <input type="button" id="downlineBtn" value="下线" />
    165   <hr/>
    166   <table id="banana" border="1" >
    167     <tr>
    168       <td width="600" align="center">聊天</td>
    169       <td width="100" align="center">上线人员</td>
    170     </tr>
    171     <tr height="200" valign="top">
    172       <td>
    173         <table id="showBanana" border="0" width="600">
    174             <!--
    175             <tr>
    176               <td>张三: 大家好</td>
    177             </tr>
    178             <tr>
    179               <td>李四: 欢迎加入群聊</td>
    180             </tr>
    181             -->
    182         </table>
    183       </td>
    184       <td>
    185         <table id="showOnlineNames" border="0">
    186             <!--
    187             <tr>
    188               <td>1</td>
    189               <td>张三</td>
    190             <tr/>
    191             <tr>
    192               <td>2</td>
    193               <td>李四</td>
    194             <tr/>
    195             -->
    196         </table>
    197       </td>
    198     </tr>
    199     <tr height="40">
    200       <td></td>
    201       <td></td>
    202     </tr>
    203     <tr>
    204       <td>
    205         <input type="text" id="messageInput"  style="590px" placeholder="巴拉巴拉点什么吧" />
    206       </td>
    207       <td>
    208         <input type="button" id="sendBtn" value="发送" />
    209       </td>
    210     </tr>
    211   </table>
    212 
    213 </body>
    214 </html>
    banana.html

      分别运行WebSocketServer和ReservedWebSocketServer,运行日志如下:

    ============先adapter后handler==============
    ============连接开始========================
    
    adapter handlerAdded = = adapter
    channel handlerAdded = = handler
    adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245])
    channel Read0 = = handler with http request : 
    adapter is write in DefaultChannelPromise@3b536aab(incomplete) with message : 
    adapter is flush
    adapter is flush
    channel Flush = = handler
    adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245])
    
    ============上线用户========================
    
    channel Read0 = = handler with socket request : {"requestId":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d","serviceId":1001,"name":"佚名"}
    adapter is write in DefaultChannelPromise@61ace442(incomplete) with socket message : {"serviceId":1001,"isSucc":true,"message":"注册成功","hadOnline":{"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d":"佚名"}}
    adapter is flush
    adapter is flush
    channel Flush = = handler
    adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245])
    
    ============发送信息========================
    
    channel Read0 = = handler with socket request : {"requestId":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d","serviceId":1002,"name":"佚名","message":"queue"}
    adapter is write in DefaultChannelPromise@5a4c689a(incomplete) with socket message : {"requestId":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d","serviceId":1003,"name":"佚名","message":"queue"}
    adapter is flush
    adapter is write in DefaultChannelPromise@3cfa9e57(incomplete) with socket message : {"serviceId":1002,"isSucc":true,"message":"发送消息成功","hadOnline":{}}
    adapter is flush
    adapter is flush
    channel Flush = = handler
    adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245])
    
    ============下线用户========================
    
    channel Read0 = = handler with socket request : {"requestId":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d","serviceId":1004}
    adapter is write in DefaultChannelPromise@b6ce0dc(incomplete) with socket message : {"serviceId":1004,"isSucc":true,"message":"下线成功","hadOnline":{}}
    adapter is flush
    adapter is flush
    channel Flush = = handler
    adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245])
    
    ============用户断线========================
    
    channel Read0 = = handler with socket request : �
    adapter is write in DefaultChannelPromise@567e3360(incomplete) close reason : 
    adapter is flush
    adapter is close in DefaultChannelPromise@1b6673fb(success)
    adapter is flush
    channel Flush = = handler
    adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 ! R:/0:0:0:0:0:0:0:1:49245])
    channel Inactive = = handler
    channel close = = handler
    adapter is close in DefaultChannelPromise@5b261d0a(success)
    channel Unregistered = = handler
    channel handlerRemoved = = handler
    adapter handlerRemoved = = adapter
    WebSocketServer运行结果

      以及

    ============先adapter后handler==============
    ============连接开始========================
    
    channel handlerAdded = = handler
    adapter handlerAdded = = adapter
    adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671])
    channel Read0 = = handler with http request : 
    adapter is write in DefaultChannelPromise@171fb888(incomplete) with message : 
    adapter is flush
    channel Flush = = handler
    adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671])
    
    ============上线用户========================
    
    channel Read0 = = handler with socket request : {"requestId":"70d182cf-b0ae-27ba-296d-33bd3ab5177b","serviceId":1001,"name":"佚名"}
    adapter is write in DefaultChannelPromise@54042c55(incomplete) with socket message : {"serviceId":1001,"isSucc":true,"message":"注册成功","hadOnline":{"70d182cf-b0ae-27ba-296d-33bd3ab5177b":"佚名"}}
    channel Flush = = handler
    adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671])
    
    ============发送信息========================
    
    channel Read0 = = handler with socket request : {"requestId":"70d182cf-b0ae-27ba-296d-33bd3ab5177b","serviceId":1002,"name":"佚名","message":"queue"}
    adapter is write in DefaultChannelPromise@324cb9a(incomplete) with socket message : {"requestId":"70d182cf-b0ae-27ba-296d-33bd3ab5177b","serviceId":1003,"name":"佚名","message":"queue"}
    adapter is write in DefaultChannelPromise@269f3a70(incomplete) with socket message : {"serviceId":1002,"isSucc":true,"message":"发送消息成功","hadOnline":{}}
    channel Flush = = handler
    adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671])
    
    ============下线用户========================
    
    channel Read0 = = handler with socket request : {"requestId":"70d182cf-b0ae-27ba-296d-33bd3ab5177b","serviceId":1004}
    adapter is write in DefaultChannelPromise@2f6a67d7(incomplete) with socket message : {"serviceId":1004,"isSucc":true,"message":"下线成功","hadOnline":{}}
    channel Flush = = handler
    adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671])
    
    ============用户断线========================
    
    channel Read0 = = handler with socket request : �
    adapter is write in DefaultChannelPromise@5dff633(incomplete) close reason : 
    adapter is flush
    adapter is close in DefaultChannelPromise@1e58e8f0(success)
    channel Flush = = handler
    adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 ! R:/0:0:0:0:0:0:0:1:64671])
    channel Inactive = = handler
    channel close = = handler
    channel Unregistered = = handler
    adapter handlerRemoved = = adapter
    channel handlerRemoved = = handler
    ReversedWebSocketServer运行结果

      除了运行顺序不同,outBoundAdapter的flush操作也多了几次,尤其在发送这一块,因为不仅要接收数据包,还要发送数据包,要多刷新adapter。

      由此可见,netty的pipeline一定要仔细规划,能先让服务器处理就先让服务器处理,把outbound拦截器放在inbound拦截器前面。

  • 相关阅读:
    java中间件
    JAVA 并发编程关键点
    pull类型消息中间件-消息服务端(三)
    pull类型消息中间件-消息消费者(二)
    pull类型消息中间件-消息发布者(一)
    push类型消息中间件-消息服务端(三)
    push类型消息中间件-消息发布者(二)
    push类型消息中间件-消息订阅者(一)
    RPC框架基本原理(三):调用链路分析
    JAVA包装类
  • 原文地址:https://www.cnblogs.com/dgutfly/p/11536116.html
Copyright © 2011-2022 走看看