zoukankan      html  css  js  c++  java
  • springboot 集成netty-socket

    1. 添加依赖

    <dependency>
        <groupId>com.corundumstudio.socketio</groupId>
        <artifactId>netty-socketio</artifactId>
        <version>1.7.18</version>
    </dependency>

    2. 添加YML配置

    # SocketIO配置
    socket:
      # SocketIO端口
      port: 9090
      # 连接数大小
      workCount: 100
      # 允许客户请求
      allowCustomRequests: true
      # 协议升级超时时间(毫秒),默认10秒,HTTP握手升级为ws协议超时时间
      upgradeTimeout: 10000
      # Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
      pingTimeout: 60000
      # Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
      pingInterval: 25000
      # 设置HTTP交互最大内容长度
      maxHttpContentLength: 1048576
      # 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
      maxFramePayloadLength: 1048576

    3. 实现Spring配置类

    import lombok.Data;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    
    /**
     * Socket 配置类
     *
     * @author: Fred
     * @email 453086@qq.com
     * @create: 2021-07-20 15:22
     */
    @Data
    @ConfigurationProperties(prefix = "socket")
    public class SocketProperties {
    
        private Integer port;
    
        private Integer workCount;
    
        private Boolean allowCustomRequests;
    
        private Integer upgradeTimeout;
    
        private Integer pingTimeout;
    
        private Integer pingInterval;
    
        private Integer maxFramePayloadLength;
    
        private Integer maxHttpContentLength;
    
    
    }
    import com.corundumstudio.socketio.SocketIOServer;
    import com.corundumstudio.socketio.annotation.SpringAnnotationScanner;
    import com.nuorui.common.config.properties.SocketProperties;
    import org.springframework.boot.context.properties.EnableConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import javax.annotation.Resource;
    
    /**
     * Socket 配置类
     *
     * @author: Fred
     * @email 453086@qq.com
     * @create: 2021-07-20 15:23
     */
    @Configuration
    @EnableConfigurationProperties(SocketProperties.class)
    public class SocketConfig {
    
        @Resource
        private SocketProperties properties;
    
        @Bean
        public SocketIOServer socketIOServer() {
            com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
            config.setPort(properties.getPort());
    
            com.corundumstudio.socketio.SocketConfig socketConfig = new com.corundumstudio.socketio.SocketConfig();
            socketConfig.setReuseAddress(true);
            config.setSocketConfig(socketConfig);
            config.setWorkerThreads(properties.getWorkCount());
            config.setAllowCustomRequests(properties.getAllowCustomRequests());
            config.setUpgradeTimeout(properties.getUpgradeTimeout());
            config.setPingTimeout(properties.getPingTimeout());
            config.setPingInterval(properties.getPingInterval());
            config.setMaxHttpContentLength(properties.getMaxHttpContentLength());
            config.setMaxFramePayloadLength(properties.getMaxFramePayloadLength());
    
            return new SocketIOServer(config);
        }
    
        /**
         * 开启SocketIOServer注解支持
         *
         * @param socketServer
         * @return
         */
        @Bean
        public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
            return new SpringAnnotationScanner(socketServer);
        }
    }

    4. 实现服务端

    import cn.hutool.core.util.StrUtil;
    import com.corundumstudio.socketio.SocketIOClient;
    import com.google.common.collect.Maps;
    import org.springframework.stereotype.Component;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.UUID;
    
    /**
     * 客户端缓存
     *
     * @author: Fred
     * @email 453086@qq.com
     * @create: 2021-07-20 16:01
     */
    @Component
    public class ClientCache {
    
        /**
         * 本地缓存
         */
        private static Map<String, HashMap<UUID, SocketIOClient>> concurrentHashMap = Maps.newConcurrentMap();
    
        /**
         * 存入本地缓存
         *
         * @param mmsi         船舶MMSI
         * @param sessionId      页面sessionID
         * @param socketIOClient 页面对应的通道连接信息
         */
        public void saveClient(String mmsi, UUID sessionId, SocketIOClient socketIOClient) {
            if (StrUtil.isNotBlank(mmsi)) {
                HashMap<UUID, SocketIOClient> sessionIdClientCache = concurrentHashMap.get(mmsi);
                if (sessionIdClientCache == null) {
                    sessionIdClientCache = new HashMap<>();
                }
                sessionIdClientCache.put(sessionId, socketIOClient);
                concurrentHashMap.put(mmsi, sessionIdClientCache);
            }
        }
    
        /**
         * 根据用户ID获取所有通道信息
         *
         * @param mmsi
         * @return
         */
        public HashMap<UUID, SocketIOClient> getMmsiClient(String mmsi) {
            return concurrentHashMap.get(mmsi);
        }
    
        /**
         * 根据用户ID及页面sessionID删除页面链接信息
         *
         * @param mmsi
         * @param sessionId
         */
        public void deleteSessionClient(String mmsi, UUID sessionId) {
            concurrentHashMap.get(mmsi).remove(sessionId);
        }
    }
    import cn.hutool.core.util.StrUtil;
    import com.corundumstudio.socketio.SocketIOClient;
    import com.corundumstudio.socketio.annotation.OnConnect;
    import com.corundumstudio.socketio.annotation.OnDisconnect;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    import java.util.UUID;
    
    /**
     * 类描述
     *
     * @author: Fred
     * @email 453086@qq.com
     * @create: 2021-07-20 16:33
     */
    @Slf4j
    @Component
    public class SocketConnection {
    
        @Resource
        private ClientCache clientCache;
    
        /**
         * 客户端连接
         *
         * @param client
         */
        @OnConnect
        public void onConnect(SocketIOClient client) {
            String mmsi = client.getHandshakeData().getSingleUrlParam("mmsi");
            UUID sessionId = client.getSessionId();
            clientCache.saveClient(mmsi, sessionId, client);
    
            log.info("客户端:" + mmsi + "|" + sessionId + "已连接");
        }
    
        /**
         * 客户端断开
         *
         * @param client
         */
        @OnDisconnect
        public void onDisconnect(SocketIOClient client) {
            String mmsi = client.getHandshakeData().getSingleUrlParam("mmsi");
            if (StrUtil.isNotBlank(mmsi)) {
                UUID sessionId = client.getSessionId();
                clientCache.deleteSessionClient(mmsi, sessionId);
    
                log.info("客户端:" + mmsi + "|" + client.getSessionId() + "已离线");
            }
        }
    }
    import com.corundumstudio.socketio.SocketIOServer;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.core.annotation.Order;
    import org.springframework.stereotype.Component;
    
    /**
     * Socket 服务器
     *
     * @author: Fred
     * @email 453086@qq.com
     * @create: 2021-07-20 15:43
     */
    @Slf4j
    @Component
    @Order(1)
    public class SocketServer implements CommandLineRunner {
    
        /**
         * socketIOServer
         */
        private final SocketIOServer socketIOServer;
    
        @Autowired
        public SocketServer(SocketIOServer socketIOServer) {
            this.socketIOServer = socketIOServer;
        }
    
        @Override
        public void run(String... args) {
            socketIOServer.start();
        }
    }

    6. 服务端发消息给客户端

    public class SocketController {
    
        @Resource
        private ClientCache clientCache;
    
        @PostMapping("/test1")
        public void test() {
            HashMap<UUID, SocketIOClient> userClient = clientCache.getMmsiClient("2222");
            userClient.forEach((uuid, socketIOClient) -> {
                //向客户端推送消息
                socketIOClient.sendEvent("event", "服务端推送消息");
            });
        }
    
    }

    ————————————————————————————————————————————————————————————————————————————————————————

    客户端

    1. 添加依赖

    <dependency>
        <groupId>io.socket</groupId>
        <artifactId>socket.io-client</artifactId>
        <version>1.0.0</version>
    </dependency>

    2. 客户端监听

    String url = "http://127.0.0.1:9093";
            try {
                IO.Options options = new IO.Options();
                options.transports           = new String[]{"websocket"};
                options.reconnectionAttempts = 2;
                // 失败重连的时间间隔
                options.reconnectionDelay = 1000;
                // 连接超时时间(ms)
                options.timeout = 500;
                // mmsi: 唯一标识 传给服务端存储
                final Socket socket = IO.socket(url + "?mmsi=2222", options);
    
                socket.on(Socket.EVENT_CONNECT, args1 -> socket.send("hello..."));
    
                // 自定义事件`connected` -> 接收服务端成功连接消息
                socket.on("connected", objects -> log.debug("服务端:" + objects[0].toString()));
    
                // 自定义事件`push_data_event` -> 接收服务端消息
                socket.on("push_data_event", objects -> log.debug("服务端:" + objects[0].toString()));
    
                // 自定义事件`myBroadcast` -> 接收服务端广播消息
                socket.on("myBroadcast", objects -> log.debug("服务端:" + objects[0].toString()));
    
                socket.connect();
    
                socket.emit("push_data_event", "发送数据 " + System.currentTimeMillis());
            } catch (Exception e) {
                e.printStackTrace();
            }
  • 相关阅读:
    ReentrantReadWriteLock读写锁的使用
    Exchanger的使用
    CyclicBarrier的用法
    Semaphore的使用
    CountDownLatch的使用
    BlockingQueue的使用
    对字符串操作的各种笔试题
    struts2请求过程源码分析
    shell语法使用
    hibernate调用mysql存储过程
  • 原文地址:https://www.cnblogs.com/fangts/p/15044173.html
Copyright © 2011-2022 走看看