zoukankan      html  css  js  c++  java
  • netty websocket协议开发

    websocket的好处我们就不用多说了,就是用于解决长连接、服务推送等需要的一种技术。

    以下我们来看一个例子:

     1 package com.ming.netty.http.websocket;
     2 
     3 import java.net.InetSocketAddress;
     4 
     5 import io.netty.bootstrap.ServerBootstrap;
     6 import io.netty.channel.ChannelFuture;
     7 import io.netty.channel.ChannelInitializer;
     8 import io.netty.channel.ChannelOption;
     9 import io.netty.channel.ChannelPipeline;
    10 import io.netty.channel.EventLoopGroup;
    11 import io.netty.channel.nio.NioEventLoopGroup;
    12 import io.netty.channel.socket.SocketChannel;
    13 import io.netty.channel.socket.nio.NioServerSocketChannel;
    14 import io.netty.handler.codec.http.HttpObjectAggregator;
    15 import io.netty.handler.codec.http.HttpServerCodec;
    16 import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
    17 import io.netty.handler.stream.ChunkedWriteHandler;
    18 
    19 public class WebSocketServer {
    20 
    21     
    22     
    23     public static void main(String[] args) {
    24         new WebSocketServer().run("127.0.0.1", 8500);
    25     }
    26     
    27     
    28     public void run(String addr,int port){
    29         EventLoopGroup bossGroup = new NioEventLoopGroup();
    30         EventLoopGroup workerGroup = new NioEventLoopGroup();
    31         try {
    32              ServerBootstrap b=new ServerBootstrap();
    33               b.group(bossGroup, workerGroup)
    34              .channel(NioServerSocketChannel.class)
    35              .option(ChannelOption.SO_BACKLOG, 128)
    36              .childOption(ChannelOption.SO_KEEPALIVE, true)
    37              .childHandler(new WebSocketServerHandlerInitializer()); 
    38               
    39               ChannelFuture f=b.bind(new InetSocketAddress(addr, port)).sync();
    40               System.out.println("启动服务器:"+f.channel().localAddress());
    41               //等等服务器端监听端口关闭
    42               f.channel().closeFuture().sync();
    43               
    44         } catch (Exception e) {
    45             e.printStackTrace();
    46         }finally{
    47             bossGroup.shutdownGracefully();
    48             workerGroup.shutdownGracefully();
    49         }
    50     }
    51     
    52     protected class WebSocketServerHandlerInitializer extends ChannelInitializer<SocketChannel>{
    53         
    54         @Override
    55         protected void initChannel(SocketChannel ch) throws Exception {
    56             ChannelPipeline pipeline = ch.pipeline(); 
    57             pipeline.addLast(new HttpServerCodec());
    58             pipeline.addLast(new HttpObjectAggregator(64*1024));
    59             pipeline.addLast(new ChunkedWriteHandler());
    60             pipeline.addLast(new HttpRequestHandler("/ws"));
    61             pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
    62             pipeline.addLast(new TextWebSocketFrameHandler());
    63         }
    64         
    65     }
    66 }
      1 package com.ming.netty.http.websocket;
      2 
      3 import io.netty.channel.Channel;
      4 import io.netty.channel.ChannelFuture;
      5 import io.netty.channel.ChannelFutureListener;
      6 import io.netty.channel.ChannelHandlerContext;
      7 import io.netty.channel.DefaultFileRegion;
      8 import io.netty.channel.SimpleChannelInboundHandler;
      9 import io.netty.handler.codec.http.DefaultFullHttpResponse;
     10 import io.netty.handler.codec.http.DefaultHttpResponse;
     11 import io.netty.handler.codec.http.FullHttpRequest;
     12 import io.netty.handler.codec.http.FullHttpResponse;
     13 import io.netty.handler.codec.http.HttpHeaders;
     14 import io.netty.handler.codec.http.HttpResponse;
     15 import io.netty.handler.codec.http.HttpResponseStatus;
     16 import io.netty.handler.codec.http.HttpVersion;
     17 import io.netty.handler.codec.http.LastHttpContent;
     18 import io.netty.handler.ssl.SslHandler;
     19 import io.netty.handler.stream.ChunkedNioFile;
     20 
     21 import java.io.File;
     22 import java.io.IOException;
     23 import java.io.RandomAccessFile;
     24 import java.net.URISyntaxException;
     25 import java.net.URL;
     26 
     27 public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
     28 
     29     private final String wsUri;
     30     private static final File INDEX;
     31  
     32     // static HTTP request handling operation.
     33     static {
     34         URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation();
     35         try {
     36            String path = location.toURI() + "WebSocketClient.html";
     37            path = !path.contains("file:") ? path : path.substring(5);
     38            INDEX = new File("D:\javaPro\workspace\NettyTest\resources\WebsocketChatClient.html");
     39          } catch (URISyntaxException e) {
     40            throw new IllegalStateException("Unable to locate WebsocketChatClient.html", e);
     41         }
     42   }
     43  
     44 
     45      public HttpRequestHandler(String wsUri) {
     46          this.wsUri = wsUri;
     47      }
     48 
     49  
     50  
     51  
     52  @Override
     53 protected void messageReceived(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
     54      channelRead(ctx,request);
     55     
     56 }
     57 
     58 
     59 
     60 
     61 public void channelRead(ChannelHandlerContext ctx, FullHttpRequest request)
     62    throws Exception {
     63   if (wsUri.equalsIgnoreCase(request.getUri())) {
     64    ctx.fireChannelRead(request.retain());
     65   } else {
     66    if (HttpHeaders.is100ContinueExpected(request)) {
     67     send100Continue(ctx);
     68    }
     69    
     70    RandomAccessFile file = new RandomAccessFile(INDEX, "r");
     71    HttpResponse response = new DefaultHttpResponse(
     72      request.getProtocolVersion(), HttpResponseStatus.OK);
     73      response.headers().set(HttpHeaders.Names.CONTENT_TYPE,
     74      "text/html; charset=UTF-8");
     75 
     76    boolean keepAlive = HttpHeaders.isKeepAlive(request);
     77    if (keepAlive) {
     78     response.headers().set(HttpHeaders.Names.CONTENT_LENGTH,  file.length());
     79     response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
     80    }
     81    ctx.write(response);
     82 
     83    if (ctx.pipeline().get(SslHandler.class) == null) {
     84     ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
     85    } else {
     86     ctx.write(new ChunkedNioFile(file.getChannel()));
     87    }
     88    ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
     89    if (!keepAlive) {
     90     future.addListener(ChannelFutureListener.CLOSE);
     91    }
     92 
     93    file.close();
     94   }
     95  }
     96 
     97  private static void send100Continue(ChannelHandlerContext ctx) {
     98   FullHttpResponse response = new DefaultFullHttpResponse(
     99     HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
    100   ctx.writeAndFlush(response);
    101  }
    102 
    103  @Override
    104  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
    105    throws Exception {
    106   Channel incoming = ctx.channel();
    107   System.out.println("Client:" + incoming.remoteAddress() + "异常");
    108   
    109   // 当出现异常就关闭连接
    110   cause.printStackTrace();
    111   ctx.close();
    112  }
    113 
    114 }
     1 package com.ming.netty.http.websocket;
     2 
     3 import io.netty.channel.Channel;
     4 import io.netty.channel.ChannelHandlerContext;
     5 import io.netty.channel.SimpleChannelInboundHandler;
     6 import io.netty.channel.group.ChannelGroup;
     7 import io.netty.channel.group.DefaultChannelGroup;
     8 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
     9 import io.netty.util.concurrent.GlobalEventExecutor;
    10 
    11 public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>  {
    12 
    13     public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    14     
    15     
    16     
    17     @Override
    18     public void channelActive(ChannelHandlerContext ctx) throws Exception {
    19           Channel incoming = ctx.channel();
    20           System.out.println("Client:" + incoming.remoteAddress() + "在线");
    21     }
    22 
    23 
    24 
    25     @Override
    26     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    27         Channel incoming = ctx.channel();
    28         System.out.println("Client:" + incoming.remoteAddress() + "掉线");
    29     }
    30 
    31 
    32     @Override
    33     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    34           Channel incoming = ctx.channel();
    35           for (Channel channel : channels) {
    36            channel.writeAndFlush(new TextWebSocketFrame("[SERVER] - "  + incoming.remoteAddress() + " 加入"));
    37           }
    38           channels.add(ctx.channel());
    39           System.out.println("Client:" + incoming.remoteAddress() + "加入");
    40     }
    41 
    42 
    43 
    44     @Override
    45     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    46         Channel incoming = ctx.channel();
    47         for (Channel channel : channels) {
    48           channel.writeAndFlush(new TextWebSocketFrame("[SERVER] - "  + incoming.remoteAddress() + " 离开"));
    49         }
    50         System.out.println("Client:" + incoming.remoteAddress() + "离开");
    51         channels.remove(ctx.channel());
    52     }
    53 
    54     
    55     
    56     
    57 
    58 
    59     @Override
    60     protected void messageReceived(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
    61         Channel incoming = ctx.channel();
    62           for (Channel channel : channels) {
    63            if (channel != incoming) {
    64             //channel.writeAndFlush(new TextWebSocketFrame("[" + incoming.remoteAddress() + "]" + msg.text()));
    65            } else {
    66             //返送给指定的
    67             channel.writeAndFlush(new TextWebSocketFrame("[服务器端返回]:" + msg.text()));
    68             
    69         
    70             //output current message to context. 
    71             StringBuffer sb = new StringBuffer();
    72             sb.append(incoming.remoteAddress()).append("->").append(msg.text());
    73             System.out.println(sb.toString());
    74            }
    75           }
    76         
    77     }
    78 
    79 
    80 
    81     @Override
    82     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    83         Channel incoming = ctx.channel();
    84         System.out.println("Client:" + incoming.remoteAddress() + "异常");
    85         // 当出现异常就关闭连接
    86         cause.printStackTrace();
    87         ctx.close();
    88     }
    89 
    90 
    91 
    92     
    93 
    94     
    95 
    96 }
     1 <!DOCTYPE html>
     2 <html>
     3 <head>
     4 <meta charset="UTF-8">
     5 <title>WebSocket Chat</title>
     6 </head>
     7 <body>
     8     <script type="text/javascript">
     9         var socket;
    10         if (!window.WebSocket) {
    11             window.WebSocket = window.MozWebSocket;
    12         }
    13         if (window.WebSocket) {
    14             socket = new WebSocket("ws://127.0.0.1:8500/ws");
    15             socket.onmessage = function(event) {
    16                 var ta = document.getElementById('responseText');
    17                 ta.value = ta.value + '
    ' + event.data
    18             };
    19             socket.onopen = function(event) {
    20                 var ta = document.getElementById('responseText');
    21                 ta.value = "连接开启!";
    22             };
    23             socket.onclose = function(event) {
    24                 var ta = document.getElementById('responseText');
    25                 ta.value = ta.value + "连接被关闭";
    26             };
    27         } else {
    28             alert("你的浏览器不支持 WebSocket!");
    29         }
    30 
    31         function send(message) {
    32             if (!window.WebSocket) {
    33                 return;
    34             }
    35             if (socket.readyState == WebSocket.OPEN) {
    36                 socket.send(message);
    37             } else {
    38                 alert("连接没有开启.");
    39             }
    40         }
    41     </script>
    42     <form onsubmit="return false;">
    43         <h3>WebSocket 聊天室:</h3>
    44         <textarea id="responseText" style=" 500px; height: 300px;"></textarea>
    45         <br> 
    46         <input type="text" name="message"  style=" 300px" value="Welcome to www.waylau.com">
    47         <input type="button" value="发送消息" onclick="send(this.form.message.value)">
    48         <input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空聊天记录">
    49     </form>
    50     <br> 
    51     <br> 
    52     
    53 </body>
    54 </html>

    运行服务器,然后在浏览器输入:127.0.0.1:8500 就可以看见一个简单的聊天室效果了.

  • 相关阅读:
    POJ 1035-Spell checker(字符串)
    No valid host was found
    Does anyone successfully use USB drive in Windows7 guest?
    iptables 问题
    openstack kilo版本控制节点异常流量分析
    openstack 控制节点大流量对外发包,nf_conntrack,table full droping packets
    Error: rpmdb open failed
    openstack message queue
    POJ 1700 经典过河问题(贪心)
    树莓派遥控船项目计划
  • 原文地址:https://www.cnblogs.com/huzi007/p/5642271.html
Copyright © 2011-2022 走看看