zoukankan      html  css  js  c++  java
  • Spring+Netty+WebSocket实例

    比较贴近生产,详见注释

    一、pom.xml

    具体太长,详见源码

        </dependency>
            <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.2.Final</version> 
        </dependency>

    二、目录结构

    三、AfterSpringBegin

    继承了AfterSpringBegin的子类在spring加载成功后,会自动启动

    package com.netty.init;
    
    import java.util.Timer;
    import java.util.TimerTask;
    
    
    
    import org.springframework.context.ApplicationListener;
    import org.springframework.context.event.ContextRefreshedEvent;
    /**
     * 
     * spring加载后改方法的子类
     * */
    public abstract class AfterSpringBegin extends TimerTask  implements ApplicationListener<ContextRefreshedEvent>{
    
        public void onApplicationEvent(ContextRefreshedEvent event) {
            // TODO Auto-generated method stub
            if(event.getApplicationContext().getParent() ==null){
                
                Timer timer = new Timer();
                timer.schedule(this, 0);
            }
        }
    
    }
    View Code

    四、Constant

    存放了websocket相关信道

    package com.netty.constant;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.channel.group.DefaultChannelGroup;
    import io.netty.util.concurrent.GlobalEventExecutor;
    
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    /**
     * 常量
     * */
    public class Constant {
        //存放所有的ChannelHandlerContext
        public static Map<String, ChannelHandlerContext> pushCtxMap = new ConcurrentHashMap<String, ChannelHandlerContext>() ;
        
        //存放某一类的channel
        public static ChannelGroup aaChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
    }
    View Code

    五、WebSocketServer

    启动服务

    package com.netty.server;
    
    import javax.annotation.PreDestroy;
    
    import org.springframework.beans.factory.annotation.Autowired;
    
    import com.netty.init.AfterSpringBegin;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.FixedRecvByteBufAllocator;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    /**
     * 启动服务
     * */
    
    public class WebSocketServer extends AfterSpringBegin{
    
        //用于客户端连接请求
        @Autowired
        private EventLoopGroup bossGroup;
        
        //用于处理客户端I/O操作
        @Autowired
        private EventLoopGroup workerGroup;
        
        //服务器的辅助启动类
        @Autowired
        private ServerBootstrap serverBootstrap;
        
        //BS的I/O处理类
        private ChannelHandler childChannelHandler;
        
        private ChannelFuture channelFuture;
        
        //服务端口
        private int port;
        
        public WebSocketServer(){
            
            System.out.println("初始化");
        }
    
        public EventLoopGroup getBossGroup() {
            return bossGroup;
        }
    
        public void setBossGroup(EventLoopGroup bossGroup) {
            this.bossGroup = bossGroup;
        }
    
        public EventLoopGroup getWorkerGroup() {
            return workerGroup;
        }
    
        public void setWorkerGroup(EventLoopGroup workerGroup) {
            this.workerGroup = workerGroup;
        }
    
        public ServerBootstrap getServerBootstrap() {
            return serverBootstrap;
        }
    
        public void setServerBootstrap(ServerBootstrap serverBootstrap) {
            this.serverBootstrap = serverBootstrap;
        }
    
        public ChannelHandler getChildChannelHandler() {
            return childChannelHandler;
        }
    
        public void setChildChannelHandler(ChannelHandler childChannelHandler) {
            this.childChannelHandler = childChannelHandler;
        }
    
        public ChannelFuture getChannelFuture() {
            return channelFuture;
        }
    
        public void setChannelFuture(ChannelFuture channelFuture) {
            this.channelFuture = channelFuture;
        }
    
        public int getPort() {
            return port;
        }
    
        public void setPort(int port) {
            this.port = port;
        }
    
        @Override
        public void run() {
            // TODO Auto-generated method stub
            try {
                bulid(port);
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        
        public void bulid(int port) throws Exception{
            
            try {
                
                //(1)boss辅助客户端的tcp连接请求  worker负责与客户端之前的读写操作
                //(2)配置客户端的channel类型
                //(3)配置TCP参数,握手字符串长度设置
                //(4)TCP_NODELAY是一种算法,为了充分利用带宽,尽可能发送大块数据,减少充斥的小块数据,true是关闭,可以保持高实时性,若开启,减少交互次数,但是时效性相对无法保证
                //(5)开启心跳包活机制,就是客户端、服务端建立连接处于ESTABLISHED状态,超过2小时没有交流,机制会被启动
                //(6)netty提供了2种接受缓存区分配器,FixedRecvByteBufAllocator是固定长度,但是拓展,AdaptiveRecvByteBufAllocator动态长度
                //(7)绑定I/O事件的处理类,WebSocketChildChannelHandler中定义
                serverBootstrap.group(bossGroup,workerGroup)
                               .channel(NioServerSocketChannel.class)
                               .option(ChannelOption.SO_BACKLOG, 1024)
                               .option(ChannelOption.TCP_NODELAY, true)
                               .childOption(ChannelOption.SO_KEEPALIVE, true)
                               .childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(592048))
                               .childHandler(childChannelHandler);
                
                System.out.println("成功");
                channelFuture = serverBootstrap.bind(port).sync();
                channelFuture.channel().closeFuture().sync();
            } catch (Exception e) {
                // TODO: handle exception
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
                
            }
    
        }
        
        //执行之后关闭
        @PreDestroy
        public void close(){
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
            
            
        }
    }
    View Code

    六、WebSocketChildChannelHandler

    五里面的private ChannelHandler childChannelHandler; 注入的就是这个类,注入配置在后面的xml中,用处在五代码里注解了

    package com.netty.server;
    
    import javax.annotation.Resource;
    
    import org.springframework.stereotype.Component;
    
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpServerCodec;
    import io.netty.handler.stream.ChunkedWriteHandler;
    
    @Component
    public class WebSocketChildChannelHandler extends ChannelInitializer<SocketChannel>{
    
        @Resource(name = "webSocketServerHandler")
        private ChannelHandler webSocketServerHandler;
        
    
    
    
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            // TODO Auto-generated method stub
            ch.pipeline().addLast("http-codec", new HttpServerCodec());
            ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
            ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
            ch.pipeline().addLast("handler",webSocketServerHandler);
        }
    
    }
    View Code

    七、WebSocketServerHandler

    websocket具体的业务处理,六中的private ChannelHandler webSocketServerHandler;,注入的就是这个类

    package com.netty.server;
    
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    import org.springframework.stereotype.Component;
    
    import com.alibaba.fastjson.JSONObject;
    import com.netty.constant.Constant;
    import com.netty.manage.ManageMessage;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelHandler.Sharable;
    import io.netty.handler.codec.http.DefaultFullHttpResponse;
    import io.netty.handler.codec.http.DefaultHttpResponse;
    import io.netty.handler.codec.http.FullHttpRequest;
    import io.netty.handler.codec.http.HttpResponseStatus;
    import io.netty.handler.codec.http.HttpVersion;
    import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.WebSocketFrame;
    import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
    import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
    import io.netty.util.CharsetUtil;
    
    /**
     * websocket 具体业务处理方法
     * 
     * */
    
    @Component
    @Sharable
    public class WebSocketServerHandler extends BaseWebSocketServerHandler{
    
        
        private WebSocketServerHandshaker handshaker;
        
        
        /**
         * 当客户端连接成功,返回个成功信息
         * */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            // TODO Auto-generated method stub
            push(ctx, "连接成功");
        }
    
        /**
         * 当客户端断开连接
         * */
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            // TODO Auto-generated method stub
            for(String key:Constant.pushCtxMap.keySet()){
                
                if(ctx.equals(Constant.pushCtxMap.get(key))){
                    //从连接池内剔除
                    System.out.println(Constant.pushCtxMap.size());
                    System.out.println("剔除"+key);
                    Constant.pushCtxMap.remove(key);
                    System.out.println(Constant.pushCtxMap.size());
                }
                
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            // TODO Auto-generated method stub
            ctx.flush();
        }
    
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg)
                throws Exception {
            // TODO Auto-generated method stub
            
            //http://xxxx
            if(msg instanceof FullHttpRequest){
                
                handleHttpRequest(ctx,(FullHttpRequest)msg);
            }else if(msg instanceof WebSocketFrame){
            //ws://xxxx    
                handlerWebSocketFrame(ctx,(WebSocketFrame)msg);
            }
            
            
            
        }
        
        
        public void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception{
            
            //关闭请求
            if(frame instanceof CloseWebSocketFrame){
                
                handshaker.close(ctx.channel(), (CloseWebSocketFrame)frame.retain());
                
                return;
            }
            //ping请求
            if(frame instanceof PingWebSocketFrame){
                
                ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
                
                return;
            }
            //只支持文本格式,不支持二进制消息
            if(!(frame instanceof TextWebSocketFrame)){
                
                throw new Exception("仅支持文本格式");
            }
            
            //客服端发送过来的消息
            
            
            
            
             String request = ((TextWebSocketFrame) frame).text();
             System.out.println("服务端收到:" + request);
             
             JSONObject jsonObject = null;
             
                try
                {
                    jsonObject = JSONObject.parseObject(request);
                    System.out.println(jsonObject.toJSONString());
                }
                catch (Exception e)
                {
                }
             if (jsonObject == null){
                 
                 return;
             }
                
              String id = (String) jsonObject.get("id");
              String type = (String) jsonObject.get("type");   
             
              //根据id判断是否登陆或者是否有权限等
              
              if(id!=null && !"".equals("id")  &&  type!=null && !"".equals("type")){
                  
                  //用户是否有权限
                  boolean idAccess = true;  
                  //类型是否符合定义
                  boolean typeAccess = true; 
                  
                  if(idAccess && typeAccess){
                      System.out.println("添加到连接池:"+request);
                      Constant.pushCtxMap.put(request,ctx);
                      Constant.aaChannelGroup.add(ctx.channel());
                  }
                  
                  
                  //根据type 存放进对于的channel池,这里就简单实现,直接放进aaChannelGroup,方便群发
                  
                  
                  
              }
    
    
            
            
        }
        //第一次请求是http请求,请求头包括ws的信息
        public void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req){
            
            
            if(!req.decoderResult().isSuccess()){
                
                sendHttpResponse(ctx,req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
                return;
            }
            
            WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws:/"+ctx.channel()+ "/websocket",null,false);
            handshaker = wsFactory.newHandshaker(req);
            
            
            if(handshaker == null){
                //不支持
                WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
            }else{
                
                handshaker.handshake(ctx.channel(), req);
            }
            
        }
        
        
        public static void sendHttpResponse(ChannelHandlerContext ctx,FullHttpRequest req,DefaultFullHttpResponse res){
            
            
            // 返回应答给客户端
            if (res.status().code() != 200)
            {
                ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
                res.content().writeBytes(buf);
                buf.release();
            }
    
            // 如果是非Keep-Alive,关闭连接
            ChannelFuture f = ctx.channel().writeAndFlush(res);
            if (!isKeepAlive(req) || res.status().code() != 200)
            {
                f.addListener(ChannelFutureListener.CLOSE);
            }
            
        }
        
        private static boolean isKeepAlive(FullHttpRequest req)
        {
            return false;
        }
    
        
        //异常处理,netty默认是关闭channel
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            // TODO Auto-generated method stub
            //输出日志
             cause.printStackTrace();
             ctx.close();
        }
        
        
        
    }
    View Code

    八、BaseWebSocketServerHandler

    把推送方法单独抽象出来,方便调用

    package com.netty.server;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    
    
    /**
     * 发消息方式 抽象出来
     * 
     * */
    public abstract class BaseWebSocketServerHandler extends SimpleChannelInboundHandler<Object>{
    
        
        /**
         * 推送单个
         * 
         * */
        public static final void push(final ChannelHandlerContext ctx,final String message){
            
            TextWebSocketFrame tws = new TextWebSocketFrame(message);
            ctx.channel().writeAndFlush(tws);
            
        }
        /**
         * 群发
         * 
         * */
        public static final void push(final ChannelGroup ctxGroup,final String message){
            
            TextWebSocketFrame tws = new TextWebSocketFrame(message);
            ctxGroup.writeAndFlush(tws);
            
        }
    }
    View Code

    九、配置

    application-netty.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
        xmlns:cache="http://www.springframework.org/schema/cache"
        xsi:schemaLocation="http://www.springframework.org/schema/beans   
            http://www.springframework.org/schema/beans/spring-beans-4.1.xsd  
            http://www.springframework.org/schema/context   
            http://www.springframework.org/schema/context/spring-context-4.1.xsd">
    
    
        <bean id="bossGroup" class="io.netty.channel.nio.NioEventLoopGroup"></bean>
        <bean id="workerGroup" class="io.netty.channel.nio.NioEventLoopGroup"></bean>
        <bean id="serverBootstrap" class="io.netty.bootstrap.ServerBootstrap" scope="prototype"></bean>
        <bean id="webSocketServer" class="com.netty.server.WebSocketServer">
    
            <property name="port" value="${websocket.server.port}"></property>
            <property name="childChannelHandler" ref="webSocketChildChannelHandler" />
        </bean>
    </beans>
    View Code

    application-beans.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:context="http://www.springframework.org/schema/context"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
               http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
               http://www.springframework.org/schema/context
               http://www.springframework.org/schema/context/spring-context-3.0.xsd">
               
        
               
         <bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/>
         
         <context:annotation-config />
         
         <context:component-scan base-package="com.netty">  
              <!-- 排除vst.back目录下Controller的service注入 -->         
             <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
        </context:component-scan>
        
        <bean id="configProperties"
            class="org.springframework.beans.factory.config.PropertiesFactoryBean">
            <property name="locations">
                <list>
                    <value>classpath*:conf/websocket.properties</value>
                </list>
            </property>
        </bean>
        
       <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PreferencesPlaceholderConfigurer">
            <property name="properties" ref="configProperties" />
       </bean> 
         
    
         
         
         
         
         
    </beans>
    View Code

    springmvc.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"
        xmlns:p="http://www.springframework.org/schema/p" xmlns:tx="http://www.springframework.org/schema/tx"
        xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="  
    http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans-4.1.xsd 
    http://www.springframework.org/schema/context 
    http://www.springframework.org/schema/context/spring-context-4.1.xsd 
    http://www.springframework.org/schema/aop 
    http://www.springframework.org/schema/aop/spring-aop-4.1.xsd 
    http://www.springframework.org/schema/tx 
    http://www.springframework.org/schema/tx/spring-tx-4.1.xsd
    http://www.springframework.org/schema/mvc
    http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd">
    
        <description>Spring-web MVC配置</description>
        
        
        
        <bean
            class="org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter">
            <property name="messageConverters">
                <list>
                    <bean
                        class="org.springframework.http.converter.StringHttpMessageConverter">
                        <property name="supportedMediaTypes">
                            <list>
                                <value>text/html;charset=UTF-8</value>
                            </list>
                        </property>
                    </bean>
    
                </list>
            </property>
        </bean>
        
        <mvc:annotation-driven />
        
        
        <context:component-scan base-package="com.netty.controller">
            <context:include-filter type="annotation"
                expression="org.springframework.stereotype.Controller" />
            <context:exclude-filter type="annotation"
                expression="org.springframework.stereotype.Service" />
        </context:component-scan>
    
    
        
        
    </beans>
    View Code

    websocket.properties

    websocket.server.port=7397

    十、客户端

    用的jsp页面,具体连接逻辑什么的看需要写

    <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">  
    <html xmlns="http://www.w3.org/1999/xhtml">  
    <head>  
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />  
    <title></title>  
    </head>  
      </head>  
      <script type="text/javascript">  
      var socket;   
      //实际生产中,id可以从session里面拿用户id
      var id  = Math.random().toString(36).substr(2);
      if(!window.WebSocket){  
          window.WebSocket = window.MozWebSocket;  
      }  
       
      if(window.WebSocket){  
          socket = new WebSocket("ws://localhost:7397");  
            
          socket.onmessage = function(event){             
                appendln("receive:" + event.data);  
          };  
       
          socket.onopen = function(event){  
                appendln("WebSocket is opened");  
                login(); 
          };  
       
          socket.onclose = function(event){  
                appendln("WebSocket is closed");  
          };  
      }else{  
            alert("WebSocket is not support");  
      }  
    
        
      function appendln(text) {  
        var ta = document.getElementById('responseText');  
        ta.value += text + "
    ";  
      }  
        
      function login(){
          console.log("aaaaaa");
          var date={"id":id,"type":"aa"};
          var login = JSON.stringify(date);
          socket.send(login);
          
      
      }  
            
      </script>  
      <body>  
        <form onSubmit="return false;">  
            <input type = "text" name="message" value="hello"/>  
            <br/><br/>  
     
            <textarea id="responseText" style=" 800px;height: 300px;"></textarea>  
        </form>  
      </body>  
    </html> 
    View Code

    十一、源码

    源码

  • 相关阅读:
    移动端测试知识概览
    24、CSS定位
    23、Xpath
    MySQL触发器
    MySQL存储过程和函数
    Cookie详解
    简单漏桶限流
    PHP异常和错误
    工厂方法模式
    简单工厂模式
  • 原文地址:https://www.cnblogs.com/ggwow/p/7994213.html
Copyright © 2011-2022 走看看