zoukankan      html  css  js  c++  java
  • Netty-socketio集成redis,服务端集群推送消息

    https://blog.csdn.net/evil_lrn/article/details/105808364

    开始前,首先先学习一下概念room和namespace

    官方地址链接 地址

    简单的来说,socket会属于某一个room,如果没有指定那么就socket就会归属默认的room,每个room又会属于某一namespace下,默认namespace是/。

    客户端连接时可以指定自己的socket的归属哪个namespace ,(这里有个坑,自己定义的namespace 一定是/xiuweiSapce,注意是/xiuweiSapce不是xiuweiSapce)比如使用

    var socket = io.connect("http://localhost:9099/xiuweiSpace?room=F006");

    至于归属那个room就是服务端自己设置的了。

    接着就是Netty-socketIo的api了,这里不多做叙述。分享一篇博客,链接 ,文章内容排版都比我写的好。

    剩下的言归正传,这是使用redis并且使用Java客户端redission作为集群推送的方案原因是因为netty-soketio与redisson是同一个作者。(这里再次膜拜大神)。这里使用redis发布订阅功能,需要服务端主动推送消息的时候,不管消息发动到哪个实例都会通过发布订阅推送到其他实例上,最总获取到所有的socketClient推送消息。

    接着一言不合上代码 

    引入所需要的pom

    1.  
      <dependency>
    2.  
      <groupId>com.corundumstudio.socketio</groupId>
    3.  
      <artifactId>netty-socketio</artifactId>
    4.  
      <version>1.7.17</version>
    5.  
      </dependency>
    6.  
       
    7.  
      <dependency>
    8.  
      <groupId>org.redisson</groupId>
    9.  
      <artifactId>redisson</artifactId>
    10.  
      <version>3.11.0</version>
    11.  
      </dependency>

    相关的配置文件与配置类

    1.  
      #WebSocket
    2.  
      # host在本地测试可以设置为localhost或者本机IP,在Linux服务器跑可换成服务器IP
    3.  
      lsmdjsj.websocket.host=socketio.host=localhost
    4.  
      lsmdjsj.websocket.socket-port=9099
    5.  
      # 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
    6.  
      lsmdjsj.websocket.maxFramePayloadLength=1048576
    7.  
      # 设置http交互最大内容长度
    8.  
      lsmdjsj.websocket.maxHttpContentLength=1048576
    9.  
      # 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间
    10.  
      lsmdjsj.websocket.upgradeTimeout=1000000
    11.  
      # Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
    12.  
      lsmdjsj.websocket.pingTimeout=6000000
    13.  
      # Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
    14.  
      lsmdjsj.websocket.pingInterval=25000
    15.  
       
    16.  
       
    17.  
      #redis
    18.  
      lsmdjsj.redisson.address=redis://127.0.0.1:6379
    19.  
      lsmdjsj.redisson.password=
    20.  
      lsmdjsj.redisson.database=5
    1.  
      @Data
    2.  
      @Component
    3.  
      @ConfigurationProperties(prefix = "lsmdjsj.websocket")
    4.  
      public class WebSocketProperties {
    5.  
       
    6.  
      /**
    7.  
      * socket 地址
    8.  
      */
    9.  
      private String host;
    10.  
      /**
    11.  
      * socket 端口
    12.  
      */
    13.  
      private Integer socketPort;
    14.  
      /**
    15.  
      * 最大每帧处理数据的长度
    16.  
      */
    17.  
      private String maxFramePayloadLength;
    18.  
      /**
    19.  
      * http交互最大内容长度
    20.  
      */
    21.  
      private String maxHttpContentLength;
    22.  
       
    23.  
       
    24.  
      /**
    25.  
      * Ping 心跳间隔(毫秒)
    26.  
      */
    27.  
      private Integer pingInterval;
    28.  
      /**
    29.  
      * Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
    30.  
      */
    31.  
      private Integer pingTimeout;
    32.  
      /**
    33.  
      * 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间
    34.  
      */
    35.  
      private Integer upgradeTimeout;
    36.  
      }
    1.  
      @Data
    2.  
      @ConfigurationProperties(prefix = "lsmdjsj.redisson")
    3.  
      public class RedissonProperty {
    4.  
      private int timeout = 3000;
    5.  
       
    6.  
      private String address;
    7.  
       
    8.  
      private String password;
    9.  
       
    10.  
      private int connectionPoolSize = 5;
    11.  
       
    12.  
      private int connectionMinimumIdleSize=2;
    13.  
       
    14.  
      private int slaveConnectionPoolSize = 250;
    15.  
       
    16.  
      private int masterConnectionPoolSize = 250;
    17.  
       
    18.  
      private String[] sentinelAddresses;
    19.  
       
    20.  
      private String masterName;
    21.  
      private int database = 1;
    22.  
       
    23.  
      }

    初始化netty-socketio的服务端和redisson

    1.  
      @Configuration
    2.  
      @EnableConfigurationProperties(RedissonProperty.class)
    3.  
      public class RedissonConfig {
    4.  
       
    5.  
      @Resource
    6.  
      private RedissonProperty conf;
    7.  
       
    8.  
      @Bean(name="redission",destroyMethod="shutdown")
    9.  
      public RedissonClient redission() {
    10.  
      Config config = new Config();
    11.  
      config.setCodec(new org.redisson.client.codec.StringCodec());
    12.  
      if(conf.getSentinelAddresses()!=null && conf.getSentinelAddresses().length>0){
    13.  
      config.useSentinelServers()
    14.  
      .setMasterName(conf.getMasterName()).addSentinelAddress(conf.getSentinelAddresses())
    15.  
      .setPassword(conf.getPassword()).setDatabase(conf.getDatabase());
    16.  
      }else{
    17.  
      SingleServerConfig serverConfig = config.useSingleServer()
    18.  
      .setAddress(conf.getAddress())
    19.  
      .setTimeout(conf.getTimeout())
    20.  
      .setConnectionPoolSize(conf.getConnectionPoolSize())
    21.  
      .setConnectionMinimumIdleSize(conf.getConnectionMinimumIdleSize())
    22.  
      .setDatabase(conf.getDatabase());
    23.  
      if(StringUtils.isNotBlank(conf.getPassword())) {
    24.  
      serverConfig.setPassword(conf.getPassword());
    25.  
      }
    26.  
      }
    27.  
      return Redisson.create(config);
    28.  
      }
    29.  
      }

    初始化netty-socketio

    1.  
      @Configuration
    2.  
      public class SocketIoConfig {
    3.  
       
    4.  
      private static Logger logger = LoggerFactory.getLogger(SocketIoConfig.class);
    5.  
       
    6.  
      @Resource
    7.  
      private RedissonClient redisson;
    8.  
       
    9.  
      @Resource
    10.  
      private WebSocketProperties webSocketProperties;
    11.  
       
    12.  
      @Resource
    13.  
      private NettyExceptionListener nettyExceptionListener;
    14.  
       
    15.  
      /**
    16.  
      * 创建 StoreFactory
    17.  
      * @return
    18.  
      */
    19.  
      private RedissonStoreFactory createRedissonStoreFactory(){
    20.  
      logger.info("创建 RedissonStoreFactory 开始");
    21.  
      RedissonStoreFactory redissonStoreFactory = new RedissonStoreFactory(redisson);
    22.  
      logger.info("创建 RedissonStoreFactory 结束");
    23.  
      return redissonStoreFactory;
    24.  
      }
    25.  
       
    26.  
       
    27.  
       
    28.  
      @Bean
    29.  
      public SocketIOServer getSocketIOServer(){
    30.  
      logger.info("创建 SocketIOServer 开始");
    31.  
      //Sokcket配置 参考 jdk
    32.  
      SocketConfig socketConfig = new SocketConfig();
    33.  
       
    34.  
      socketConfig.setTcpNoDelay(true);
    35.  
      //在默认情况下,当调用close关闭socke的使用,close会立即返回,
    36.  
      // 但是,如果send buffer中还有数据,系统会试着先把send buffer中的数据发送出去,然后close才返回.
    37.  
      socketConfig.setSoLinger(0);
    38.  
      com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
    39.  
      // 设置监听端口
    40.  
      config.setPort(webSocketProperties.getSocketPort());
    41.  
      // 协议升级超时时间(毫秒),默认10000。HTTP握手升级为ws协议超时时间
    42.  
      config.setUpgradeTimeout(webSocketProperties.getUpgradeTimeout());
    43.  
      // Ping消息间隔(毫秒),默认25000。客户端向服务器发送一条心跳消息间隔
    44.  
      config.setPingInterval(webSocketProperties.getPingInterval());
    45.  
      // Ping消息超时时间(毫秒),默认60000,这个时间间隔内没有接收到心跳消息就会发送超时事件
    46.  
      config.setPingTimeout(webSocketProperties.getPingTimeout());
    47.  
      // 推荐使用redisson
    48.  
      config.setStoreFactory(createRedissonStoreFactory());
    49.  
      //异常处理
    50.  
      config.setExceptionListener(nettyExceptionListener);
    51.  
      //手动确认
    52.  
      config.setAckMode(AckMode.MANUAL);
    53.  
      // 握手协议参数使用JWT的Token认证方案 认证方案
    54.  
      config.setAuthorizationListener(data -> {
    55.  
      /* HttpHeaders httpHeaders = data.getHttpHeaders();
    56.  
      String token = httpHeaders.get("Authorization");*/
    57.  
      return true;
    58.  
      });
    59.  
      config.setSocketConfig(socketConfig);
    60.  
      logger.info("创建 SocketIOServer 结束");
    61.  
      return new SocketIOServer(config);
    62.  
      }
    63.  
       
    64.  
      /**
    65.  
      * spring
    66.  
      * @param socketServer
    67.  
      * @return
    68.  
      */
    69.  
      @Bean
    70.  
      public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
    71.  
      return new SpringAnnotationScanner(socketServer);
    72.  
      }
    73.  
       
    74.  
      @Bean
    75.  
      public PubSubStore pubSubStore(SocketIOServer socketServer) {
    76.  
      return socketServer.getConfiguration().getStoreFactory().pubSubStore();
    77.  
      }
    78.  
      }

    启动socket服务,并且订阅事件,也可以自定义设置相应的namespace的相关事件

    1.  
      @Component
    2.  
      @Order(1)
    3.  
      public class SocketServerRunner implements CommandLineRunner {
    4.  
       
    5.  
      private static Logger logger = LoggerFactory.getLogger(SocketServerRunner.class);
    6.  
       
    7.  
      @Resource
    8.  
      private SocketIOServer socketIOServer;
    9.  
       
    10.  
      @Resource
    11.  
      private PubSubStore pubSubStore;
    12.  
       
    13.  
      @Resource
    14.  
      private RedissonClient redisson;
    15.  
       
    16.  
      @Override
    17.  
      public void run(String... args) throws Exception {
    18.  
      logger.info("socketIOServer 启动");
    19.  
      socketIOServer.start();
    20.  
       
    21.  
      //订阅消息
    22.  
      pubSubStore.subscribe(PubSubType.DISPATCH,data -> {
    23.  
      Collection<SocketIOClient> clients = null;
    24.  
      String room = data.getRoom();
    25.  
      String namespace = data.getNamespace();
    26.  
      Packet packet = data.getPacket();
    27.  
      String jsonData = packet.getData();
    28.  
      if(!StringUtils.isBlank(namespace)){
    29.  
      SocketIONamespace socketIONamespace = socketIOServer.getNamespace(namespace);
    30.  
      if(StringUtils.isBlank(room)){
    31.  
      clients = socketIONamespace.getRoomOperations(room).getClients();
    32.  
      }
    33.  
      }else{
    34.  
      clients = socketIOServer.getBroadcastOperations().getClients();
    35.  
      }
    36.  
      if(!CollectionUtils.isEmpty(clients)){
    37.  
      for (SocketIOClient client : clients) {
    38.  
      client.sendEvent(Constants.PUSH_MSG,jsonData);
    39.  
      }
    40.  
      }
    41.  
      },DispatchMessage.class);
    42.  
      addNameSpace(socketIOServer);
    43.  
      }
    44.  
       
    45.  
       
    46.  
      private void addNameSpace(SocketIOServer socketIOServer){
    47.  
      SocketIONamespace xiuweiSpace = socketIOServer.addNamespace(Constants.XIU_WEI_NAME_SPACE);
    48.  
      xiuweiSpace.addConnectListener(client -> {
    49.  
      Map<String,Object> clientMap = new HashMap<>(16);
    50.  
      String nameSpace = client.getNamespace().getName();
    51.  
      String room = client.getHandshakeData().getSingleUrlParam("room");
    52.  
      String sessionId = client.getSessionId().toString();
    53.  
      logger.info("xiuweiSpace连接成功, room={},nameSpace={}, sessionId={}", room, nameSpace,sessionId);
    54.  
      if(StringUtils.isNotBlank(room)){
    55.  
      client.joinRoom(room);
    56.  
      clientMap.put("rooms",room);
    57.  
      }
    58.  
      clientMap.put("createTime", LocalDateTime.now().toString());
    59.  
      redisson.getBucket(Constants.KEY_ROOM_PREFIX+Constants.XIU_WEI_NAME_SPACE+sessionId).trySet(clientMap);
    60.  
      });
    61.  
      xiuweiSpace.addDisconnectListener(client -> {
    62.  
      logger.info("客户端:" + client.getSessionId() + "断开连接");
    63.  
      String sessionId = client.getSessionId().toString();
    64.  
      redisson.getBucket(Constants.KEY_ROOM_PREFIX+Constants.XIU_WEI_NAME_SPACE+sessionId).delete();
    65.  
      });
    66.  
      xiuweiSpace.addEventListener(Constants.XIU_WEI_EVINT,String.class,(client, data, ackSender) -> {
    67.  
      client.sendEvent(Constants.XIU_WEI_EVINT,data);
    68.  
      if (ackSender.isAckRequested()) {
    69.  
      logger.info("xiuwei接受到的消息. message={}", data);
    70.  
      }
    71.  
      });
    72.  
      }
    73.  
      }

    默认处理未指定namesapce的socketClient 这里使用标签

    1.  
      @Component
    2.  
      public class MessageEventHandler {
    3.  
       
    4.  
      private static final Logger logger = LoggerFactory.getLogger(MessageEventHandler.class);
    5.  
       
    6.  
      public static ConcurrentMap<String, SocketIOClient> socketIOClientMap = new ConcurrentHashMap<>();
    7.  
       
    8.  
       
    9.  
      @Resource
    10.  
      private RedissonClient redisson;
    11.  
       
    12.  
      @Resource
    13.  
      private SocketIOServer socketIOServer;
    14.  
       
    15.  
      @OnConnect
    16.  
      public void onConnect(SocketIOClient client){
    17.  
      Map<String,Object> clientMap = new HashMap<>(16);
    18.  
      if(client!=null){
    19.  
      String room = client.getHandshakeData().getSingleUrlParam("room");
    20.  
      String nameSpace = client.getNamespace().getName();
    21.  
      String sessionId = client.getSessionId().toString();
    22.  
      logger.info("socket连接成功, room={}, sessionId={},namespace={}",room,sessionId,nameSpace);
    23.  
      if(StringUtils.isNotBlank(room)){
    24.  
      client.joinRoom(room);
    25.  
      clientMap.put("rooms",room);
    26.  
      }
    27.  
      clientMap.put("createTime", LocalDateTime.now().toString());
    28.  
      redisson.getBucket(Constants.KEY_ROOM_PREFIX+sessionId).trySet(clientMap);
    29.  
      }
    30.  
      }
    31.  
       
    32.  
      /**
    33.  
      * 客户端关闭连接时触发
    34.  
      *
    35.  
      * @param client
    36.  
      */
    37.  
      @OnDisconnect
    38.  
      public void onDisconnect(SocketIOClient client) {
    39.  
      logger.info("客户端:" + client.getSessionId() + "断开连接");
    40.  
      }
    41.  
       
    42.  
      /**
    43.  
      * 客户端事件
    44.  
      *
    45.  
      * @param client  客户端信息
    46.  
      * @param request 请求信息
    47.  
      * @param msg  客户端发送数据
    48.  
      */
    49.  
      @OnEvent(value = "messageevent")
    50.  
      public void onEvent(SocketIOClient client, AckRequest request, String msg) {
    51.  
      logger.info("发来消息:" + msg);
    52.  
      //回发消息
    53.  
      JSONObject jsonObject = JSON.parseObject(msg);
    54.  
      String message = jsonObject.getString("message");
    55.  
      Collection<SocketIOClient> clients = socketIOServer.getBroadcastOperations().getClients();
    56.  
      for (SocketIOClient clientByRoom : clients) {
    57.  
      clientByRoom.sendEvent("messageevent", client.getSessionId().toString()+": "+message);
    58.  
      }
    59.  
      }
    60.  
       
    61.  
      }

    最后主动推送客户端的controller

    1.  
      @RequestMapping("/push")
    2.  
      @ResponseBody
    3.  
      public SinoHttpResponse<Boolean> pushMsgByService(@RequestBody ChatMessage chatMessage){
    4.  
      SocketIONamespace namespace = socketIOServer.getNamespace(chatMessage.getNamespace());
    5.  
      Collection<SocketIOClient> allClients = namespace.getAllClients();
    6.  
      for (SocketIOClient client : allClients) {
    7.  
      client.sendEvent(chatMessage.getEventName(),chatMessage.getMessage());
    8.  
      }
    9.  
      return SinoHttpResponse.success(true);
    10.  
      }

    最后贴一下可以测试的html ,本人前端技术巨渣,这里使用jquery写的demo

    1.  
      <!DOCTYPE html>
    2.  
      <html lang="en" xmlns="http://www.w3.org/1999/xhtml" xmlns:th="http://www.thymeleaf.org">
    3.  
      <head>
    4.  
      <title>webSocket测试</title>
    5.  
      <script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/2.3.0/socket.io.js"></script>
    6.  
      <!-- 新 Bootstrap 核心 CSS 文件 -->
    7.  
      <link rel="stylesheet" href="//cdn.bootcss.com/bootstrap/3.3.5/css/bootstrap.min.css">
    8.  
      <!-- 可选的Bootstrap主题文件(一般不用引入) -->
    9.  
      <link rel="stylesheet" href="//cdn.bootcss.com/bootstrap/3.3.5/css/bootstrap-theme.min.css">
    10.  
      <!-- jQuery文件。务必在bootstrap.min.js 之前引入 -->
    11.  
      <script src="//cdn.bootcss.com/jquery/1.11.3/jquery.min.js"></script>
    12.  
      <!--<script type="text/javascript" src="js/jquery-1.7.2.js"></script>-->
    13.  
      <!-- 最新的 Bootstrap 核心 JavaScript 文件 -->
    14.  
      <script src="//cdn.bootcss.com/bootstrap/3.3.5/js/bootstrap.min.js"></script>
    15.  
      <script type="text/javascript">
    16.  
      $(function(){
    17.  
      /**
    18.  
      * 前端js的 socket.emit("事件名","参数数据")方法,是触发后端自定义消息事件的时候使用的,
    19.  
      * 前端js的 socket.on("事件名",匿名函数(服务器向客户端发送的数据))为监听服务器端的事件
    20.  
      **/
    21.  
      var socket = io.connect("http://localhost:9099?room=F006");
    22.  
      var firstconnect = true;
    23.  
      if(firstconnect) {
    24.  
      console.log("第一次链接初始化");
    25.  
      //监听服务器连接事件
    26.  
      socket.on('connect', function(){
    27.  
      $("#tou").html("链接服务器成功!");
    28.  
      });
    29.  
      //监听服务器关闭服务事件
    30.  
      socket.on('disconnect', function(){
    31.  
      $("#tou").html("与服务器断开了链接!");
    32.  
      });
    33.  
      //监听服务器端发送消息事件
    34.  
      socket.on('messageevent', function(data) {
    35.  
      $("#msg").html($("#msg").html() + "<br/>" + data);
    36.  
      });
    37.  
      firstconnect = false;
    38.  
      } else {
    39.  
      socket.socket.reconnect();
    40.  
      }
    41.  
       
    42.  
       
    43.  
       
    44.  
       
    45.  
       
    46.  
      $('#send').bind('click', function() {
    47.  
      send();
    48.  
      });
    49.  
       
    50.  
       
    51.  
       
    52.  
      function send(){
    53.  
      if (socket != null) {
    54.  
      debugger;
    55.  
      var message = document.getElementById('message').value;
    56.  
      var title = "message";
    57.  
      var obj = {message:message,title:title};
    58.  
      var str = JSON.stringify(obj);
    59.  
      socket.emit("messageevent",str);
    60.  
      } else {
    61.  
      alert('未与服务器链接.');
    62.  
      }
    63.  
      }
    64.  
      });
    65.  
      </script>
    66.  
       
    67.  
      </head>
    68.  
      <body>
    69.  
      <div class="page-header" id="tou">
    70.  
      webSocket及时聊天Demo程序
    71.  
      </div>
    72.  
      <div class="well" id="msg">
    73.  
      </div>
    74.  
      <div class="col-lg">
    75.  
      <div class="input-group">
    76.  
      <input type="text" class="form-control" placeholder="发送信息..." id="message">
    77.  
      <span class="input-group-btn">
    78.  
      <button class="btn btn-default" type="button" id="send" >发送</button>
    79.  
      </span>
    80.  
      </div><!-- /input-group -->
    81.  
      </div><!-- /.col-lg-6 -->
    82.  
      </div><!-- /.row --><br><br>
    83.  
      后台主动发送消息:<button class="btn btn-default" type="button" onclick="mess()">发送</button>
    84.  
      <br><br>
    85.  
      <a href="/user/index">back to index</a>
    86.  
      <script src="http://js.biocloud.cn/jquery/1.11.3/jquery.min.js"></script>
    87.  
      <script>
    88.  
      function mess() {
    89.  
      $.ajax({
    90.  
      type: "POST",
    91.  
      data:{index : "echo"},
    92.  
      url: "/websocket/auditing",
    93.  
      success: function (data) {
    94.  
      //alert(data);
    95.  
      }
    96.  
      });
    97.  
      }
    98.  
      </script>
    99.  
      </body>
    100.  
      </html>

    最后感谢大家的观看,这只是一个demo,会存在各种各样的问题,欢迎大家指正

  • 相关阅读:
    使用视图显示Oracle ACFS信息
    ORA-00600: internal error code, arguments: [ktecgsc:kcbz_objdchk], [0], [0], [1], [], [], [], [], [], [], [], []
    ORA-00600 arguments: [4194]
    闪回查询
    改写exists
    淘宝mysql月报连接
    pdb表空间巡检脚本
    Oracle表空间名称不能超过30个字符
    PL/SQL package SYS.DBMS_BACKUP_RESTORE version 19.03.00.00 in TARGET database is not current
    Linux查看进程内存占用
  • 原文地址:https://www.cnblogs.com/xiami2046/p/13899962.html
Copyright © 2011-2022 走看看