zoukankan      html  css  js  c++  java
  • netty websocket

    0.maven依赖

    <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
    <dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>5.0.0.Alpha1</version>
    </dependency>

    1.

    package cn.cloudwalk.isc.util.netty;

    import io.netty.channel.group.ChannelGroup;
    import io.netty.channel.group.DefaultChannelGroup;
    import io.netty.util.concurrent.GlobalEventExecutor;


    public class CaptureGlobalUserUtil {

    /**
    * 保存全局的 连接上服务器的客户
    */
    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor
    .INSTANCE);
    }

    2.

    package cn.cloudwalk.isc.util.netty;

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.*;
    import io.netty.handler.codec.http.*;
    import io.netty.handler.codec.http.websocketx.*;
    import io.netty.util.AttributeKey;
    import io.netty.util.CharsetUtil;

    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import cn.cloudwalk.isc.util.em.Enums.NettyUri;


    public class CaptureMyChannelHandler
    extends SimpleChannelInboundHandler<Object> {

    private static final Logger LOGGER = LoggerFactory
    .getLogger(CaptureMyChannelHandler.class);

    private static WebSocketServerHandshaker handshaker;
    private static final String HOME_ADD = "/home";
    private static final int STATUS_CODE = 200;
    private static Map<String, List<ChannelHandlerContext>> CTX_MAP = new HashMap<String, List<ChannelHandlerContext>>();

    /**
    * 连接上服务器
    * <p>Title: handlerAdded</p>
    * <p>Description: </p>
    * @param ctx
    * @throws Exception
    * @see io.netty.channel.ChannelHandlerAdapter#handlerAdded(io.netty.channel.ChannelHandlerContext)
    * @date: 2019年2月15日 下午3:07:23
    */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    // LOGGER.info("【handlerAdded】====>" + ctx.channel().id());
    CaptureGlobalUserUtil.channels.add(ctx.channel());
    }

    /**
    * 断开连接
    * <p>Title: handlerRemoved</p>
    * <p>Description: </p>
    * @param ctx
    * @throws Exception
    * @see io.netty.channel.ChannelHandlerAdapter#handlerRemoved(io.netty.channel.ChannelHandlerContext)
    * @date: 2019年2月15日 下午3:07:16
    */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    // LOGGER.info("【handlerRemoved】====>" + ctx.channel().id());
    CaptureGlobalUserUtil.channels.remove(ctx);
    }

    /**
    * 连接异常 需要关闭相关资源
    * <p>Title: exceptionCaught</p>
    * <p>Description: </p>
    * @param ctx
    * @param cause
    * @throws Exception
    * @see io.netty.channel.ChannelHandlerAdapter#exceptionCaught(io.netty.channel.ChannelHandlerContext, java.lang.Throwable)
    * @date: 2019年2月15日 下午3:07:05
    */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
    throws Exception {
    // LOGGER.error("【系统异常】======>" + cause.toString());
    ctx.close();
    ctx.channel().close();
    }

    /**
    * 活跃的通道 也可以当作用户连接上客户端进行使用
    * <p>Title: channelActive</p>
    * <p>Description: </p>
    * @param ctx
    * @throws Exception
    * @see io.netty.channel.ChannelHandlerAdapter#channelActive(io.netty.channel.ChannelHandlerContext)
    */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    LOGGER.info("【channelActive】=====>" + ctx.channel());
    }

    /**
    * 不活跃的通道 就说明用户失去连接
    * <p>Title: channelInactive</p>
    * <p>Description: </p>
    * @param ctx
    * @throws Exception
    * @see io.netty.channel.ChannelHandlerAdapter#channelInactive(io.netty.channel.ChannelHandlerContext)
    */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    }

    /**
    * 这里只要完成 flush
    * <p>Title: channelReadComplete</p>
    * <p>Description: </p>
    * @param ctx
    * @throws Exception
    * @see io.netty.channel.ChannelHandlerAdapter#channelReadComplete(io.netty.channel.ChannelHandlerContext)
    */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx)
    throws Exception {
    ctx.flush();
    }

    /**
    * 这里是保持服务器与客户端长连接 进行心跳检测 避免连接断开
    * @param ctx
    * @param evt
    * @throws Exception
    */
    /*
    * @Override public void userEventTriggered(ChannelHandlerContext ctx,
    * Object evt) throws Exception { if(evt instanceof IdleStateEvent){
    * IdleStateEvent stateEvent = (IdleStateEvent) evt; PingWebSocketFrame ping
    * = new PingWebSocketFrame(); switch (stateEvent.state()){ //读空闲(服务器端) case
    * READER_IDLE: LOGGER.info("【"+ctx.channel().remoteAddress()+"】读空闲(服务器端)");
    * ctx.writeAndFlush(ping); break; //写空闲(客户端) case WRITER_IDLE:
    * LOGGER.info("【"+ctx.channel().remoteAddress()+"】写空闲(客户端)");
    * ctx.writeAndFlush(ping); break; case ALL_IDLE:
    * LOGGER.info("【"+ctx.channel().remoteAddress()+"】读写空闲"); break; } } }
    */

    /**
    * 收发消息处理
    * <p>Title: messageReceived</p>
    * <p>Description: </p>
    * @param ctx
    * @param msg
    * @throws Exception
    * @see io.netty.channel.SimpleChannelInboundHandler#messageReceived(io.netty.channel.ChannelHandlerContext, java.lang.Object)
    */
    @Override
    protected void messageReceived(ChannelHandlerContext ctx, Object msg)
    throws Exception {
    if (msg instanceof HttpRequest) {
    doHandlerHttpRequest(ctx, (HttpRequest) msg);
    // if(null == pubctx) {
    // pubctx = ctx;
    // }
    }
    else if (msg instanceof WebSocketFrame) {
    doHandlerWebSocketFrame(ctx, (WebSocketFrame) msg);
    }
    }

    /**
    * websocket消息处理
    * @Title: doHandlerWebSocketFrame
    * @Description: TODO(这里用一句话描述这个方法的作用)
    * @param ctx
    * @param msg
    * @return: void
    */
    public static void doHandlerWebSocketFrame(ChannelHandlerContext ctx,
    WebSocketFrame msg) {
    //判断msg 是哪一种类型 分别做出不同的反应
    if (msg instanceof CloseWebSocketFrame) {
    // LOGGER.info("【关闭】");
    handshaker.close(ctx.channel(), (CloseWebSocketFrame) msg);
    return;
    }
    if (msg instanceof PingWebSocketFrame) {
    // LOGGER.info("【ping】");
    PongWebSocketFrame pong = new PongWebSocketFrame(
    msg.content().retain());
    ctx.channel().writeAndFlush(pong);
    return;
    }
    if (msg instanceof PongWebSocketFrame) {
    // LOGGER.info("【pong】");
    PingWebSocketFrame ping = new PingWebSocketFrame(
    msg.content().retain());
    ctx.channel().writeAndFlush(ping);
    return;
    }
    if (!(msg instanceof TextWebSocketFrame)) {
    // LOGGER.info("【不支持二进制】");
    throw new UnsupportedOperationException("不支持二进制");
    }
    //可以对消息进行处理
    //群发
    if (ctx == null) {
    // ctx = CTX_MAP.get()
    }
    for (Channel channel : CaptureGlobalUserUtil.channels) {
    // TextWebSocketFrame tw = new TextWebSocketFrame("123123");
    // channel.writeAndFlush(new TextWebSocketFrame(tw.text()));
    ctx.channel().writeAndFlush(
    new TextWebSocketFrame(((TextWebSocketFrame) msg).text()));
    // channel.writeAndFlush(new TextWebSocketFrame(((TextWebSocketFrame) msg).text()));
    }

    }

    /**
    * wetsocket第一次连接握手
    * @Title: doHandlerHttpRequest
    * @Description: TODO(这里用一句话描述这个方法的作用)
    * @param ctx
    * @param msg
    * @return: void
    */
    private void doHandlerHttpRequest(ChannelHandlerContext ctx,
    HttpRequest msg) {
    // http 解码失败
    System.out.println(
    "-------------------------msg.headers().get("Upgrade"))--------------"
    + msg.headers().get("Validator"));
    if (!msg.getDecoderResult().isSuccess()
    || (!"websocket".equalsIgnoreCase(msg.headers().get("Upgrade")))) {
    sendHttpResponse(ctx, (FullHttpRequest) msg,
    new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
    HttpResponseStatus.BAD_REQUEST));
    }
    //可以获取msg的uri来判断
    String uri = msg.getUri();
    if (!uri.substring(1).equals(NettyUri.URI_HOME.getCode())
    && !uri.substring(1).equals(NettyUri.URI_OTHER.getCode())) {
    ctx.close();
    }
    ctx.attr(AttributeKey.valueOf("type")).set(uri);
    //可以通过url获取其他参数
    String flag = null;
    if (HOME_ADD.equals(uri)) {
    flag = NettyUri.URI_HOME.getCode();
    }
    else {
    flag = NettyUri.URI_OTHER.getCode();
    }

    WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(
    "ws://" + msg.headers().get("Host") + "/" + flag + "", null,
    false);
    handshaker = factory.newHandshaker(msg);
    if (handshaker == null) {
    WebSocketServerHandshakerFactory
    .sendUnsupportedWebSocketVersionResponse(ctx.channel());
    }
    //进行连接
    handshaker.handshake(ctx.channel(), (FullHttpRequest) msg);
    //可以做其他处理
    if (CTX_MAP.containsKey(flag)) {
    CTX_MAP.get(flag).add(ctx);
    }
    else {
    List<ChannelHandlerContext> ctxList = new ArrayList<ChannelHandlerContext>();
    ctxList.add(ctx);
    CTX_MAP.put(flag, ctxList);
    }

    }

    /**
    * 应答给客户端
    * @Title: sendHttpResponse
    * @Description: TODO(这里用一句话描述这个方法的作用)
    * @param ctx
    * @param req
    * @param res  
    * @return: void
    */
    private static void sendHttpResponse(ChannelHandlerContext ctx,
    FullHttpRequest req, DefaultFullHttpResponse res) {
    // 返回应答给客户端
    if (res.getStatus().code() != STATUS_CODE) {
    ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(),
    CharsetUtil.UTF_8);
    res.content().writeBytes(buf);
    buf.release();
    }
    // 如果是非Keep-Alive,关闭连接
    ChannelFuture f = ctx.channel().writeAndFlush(res);
    if (!HttpHeaders.isKeepAlive(req) || res.getStatus().code() != STATUS_CODE) {
    f.addListener(ChannelFutureListener.CLOSE);
    }
    }

    /**
    * 发送消息
    */
    public static void send(String msg, String upgrade) {
    if (CTX_MAP.containsKey(upgrade)) {
    List<ChannelHandlerContext> ctxList = CTX_MAP.get(upgrade);
    TextWebSocketFrame tw = new TextWebSocketFrame(msg);
    for (ChannelHandlerContext ctx : ctxList) {
    ctx.channel().writeAndFlush(new TextWebSocketFrame(tw.text()));
    }
    }
    // else {
    // LOGGER.info("消息推送失败,未找到[" + upgrade + "]channel");
    // }
    }
    }

    3.

    package cn.cloudwalk.isc.util.netty;

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpServerCodec;
    import io.netty.handler.stream.ChunkedWriteHandler;
    import io.netty.handler.timeout.IdleStateHandler;

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;

    import cn.cloudwalk.isc.util.thread.SysThreadPool;

    import javax.annotation.PostConstruct;

    import java.util.concurrent.TimeUnit;


    @Component
    public class CaptureNettyServer {

    private static final Logger LOGGER = LoggerFactory.getLogger(CaptureNettyServer.class);

    @Value("${netty.server.port}")
    public Integer port;


    public Integer getPort() {
    return port;
    }

    public void setPort(Integer port) {
    this.port = port;
    }

    @PostConstruct()
    public void init(){
    StartTask task = new StartTask(port);
    SysThreadPool.getThread().execute(task);
    }

    static class StartTask implements Runnable {
    private int taskPport;

    public StartTask(int taskPport) {
    this.taskPport = taskPport;
    }

    public int getTaskPport() {
    return taskPport;
    }

    public void setTaskPport(int taskPport) {
    this.taskPport = taskPport;
    }

    @Override
    public void run() {
    //服务端需要2个线程组 boss处理客户端连接 work进行客服端连接之后的处理
    EventLoopGroup boss = new NioEventLoopGroup();
    EventLoopGroup work = new NioEventLoopGroup();
    try {
    ServerBootstrap bootstrap = new ServerBootstrap();
    //服务器 配置
    bootstrap.group(boss,work).channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
    // HttpServerCodec:将请求和应答消息解码为HTTP消息
    socketChannel.pipeline().addLast("http-codec",new HttpServerCodec());
    // HttpObjectAggregator:将HTTP消息的多个部分合成一条完整的HTTP消息
    socketChannel.pipeline().addLast("aggregator",new HttpObjectAggregator(65536));
    // ChunkedWriteHandler:向客户端发送HTML5文件
    socketChannel.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
    // 进行设置心跳检测
    socketChannel.pipeline().addLast(new IdleStateHandler(60,30,60*30, TimeUnit.SECONDS));
    // 配置通道处理 来进行业务处理
    socketChannel.pipeline().addLast(new CaptureMyChannelHandler());
    }
    }).option(ChannelOption.SO_BACKLOG,1024).childOption(ChannelOption.SO_KEEPALIVE,true);
    //绑定端口 开启事件驱动
    LOGGER.info("【服务器启动成功========端口:"+taskPport+"】");
    Channel channel = bootstrap.bind(taskPport).sync().channel();
    channel.closeFuture().sync();
    }catch (Exception e){
    LOGGER.error(e.toString(), e);
    }finally {
    //关闭资源
    boss.shutdownGracefully();
    work.shutdownGracefully();
    }
    }
    }
    }

    4.html

    <html>
    <head>
    <meta http-equiv="Content-Type" content="text/html; charset = utf-8"/>
    <title>WebSocket客户端</title>
    <script type="text/javascript">
    var socket;
    if(!window.WebSocket){
    window.WebSocket = window.MozWebSocket;
    }

    if(window.WebSocket){
    socket = new WebSocket("ws://localhost:9005/home");
    socket.onmessage = function(event){
    var ta = document.getElementById('responseContent');
    ta.value += event.data + " ";
    };

    socket.onopen = function(event){
    var ta = document.getElementById('responseContent');
    ta.value = "你当前的浏览器支持WebSocket,请进行后续操作 ";
    };

    socket.onclose = function(event){
    var ta = document.getElementById('responseContent');
    ta.value = "";
    ta.value = "WebSocket连接已经关闭 ";
    };
    }else{
    alert("您的浏览器不支持WebSocket");
    }


    function send(message){
    if(!window.WebSocket){
    return;
    }
    if(socket.readyState == WebSocket.OPEN){
    socket.send(message);
    }else{
    alert("WebSocket连接没有建立成功!!");
    }
    }
    </script>
    </head>
    <body>
    <form onSubmit="return false;">
    <input type = "text" name = "message" value = ""/>
    <br/><br/>
    <input type = "button" value = "发送WebSocket请求消息" onClick = "send(this.form.message.value)"/>
    <hr color="red"/>
    <h2>客户端接收到服务端返回的应答消息</h2>
    <textarea id = "responseContent" style = "1024px; height:300px"></textarea>
    </form>
    </body>
    </html>

  • 相关阅读:
    oracle
    mysql的必知技巧
    sql_update
    sql查询
    Java 动态页面技术 之 jsp
    Java 会话技术 之 session
    Java 会话技术 之cookie
    Java HttpServletRequest
    Java HttpServletResponse
    Java Servlet接口、web.xml配置、HttpServlet父类
  • 原文地址:https://www.cnblogs.com/jinnian18sui/p/11023924.html
Copyright © 2011-2022 走看看