1. 添加依赖
<dependency> <groupId>com.corundumstudio.socketio</groupId> <artifactId>netty-socketio</artifactId> <version>1.7.18</version> </dependency>
2. 添加YML配置
# SocketIO配置
socket:
# SocketIO端口
port: 9090
# 连接数大小
workCount: 100
# 允许客户请求
allowCustomRequests: true
# 协议升级超时时间(毫秒),默认10秒,HTTP握手升级为ws协议超时时间
upgradeTimeout: 10000
# Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
pingTimeout: 60000
# Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
pingInterval: 25000
# 设置HTTP交互最大内容长度
maxHttpContentLength: 1048576
# 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
maxFramePayloadLength: 1048576
3. 实现Spring配置类
import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; /** * Socket 配置类 * * @author: Fred * @email 453086@qq.com * @create: 2021-07-20 15:22 */ @Data @ConfigurationProperties(prefix = "socket") public class SocketProperties { private Integer port; private Integer workCount; private Boolean allowCustomRequests; private Integer upgradeTimeout; private Integer pingTimeout; private Integer pingInterval; private Integer maxFramePayloadLength; private Integer maxHttpContentLength; }
import com.corundumstudio.socketio.SocketIOServer; import com.corundumstudio.socketio.annotation.SpringAnnotationScanner; import com.nuorui.common.config.properties.SocketProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.Resource; /** * Socket 配置类 * * @author: Fred * @email 453086@qq.com * @create: 2021-07-20 15:23 */ @Configuration @EnableConfigurationProperties(SocketProperties.class) public class SocketConfig { @Resource private SocketProperties properties; @Bean public SocketIOServer socketIOServer() { com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration(); config.setPort(properties.getPort()); com.corundumstudio.socketio.SocketConfig socketConfig = new com.corundumstudio.socketio.SocketConfig(); socketConfig.setReuseAddress(true); config.setSocketConfig(socketConfig); config.setWorkerThreads(properties.getWorkCount()); config.setAllowCustomRequests(properties.getAllowCustomRequests()); config.setUpgradeTimeout(properties.getUpgradeTimeout()); config.setPingTimeout(properties.getPingTimeout()); config.setPingInterval(properties.getPingInterval()); config.setMaxHttpContentLength(properties.getMaxHttpContentLength()); config.setMaxFramePayloadLength(properties.getMaxFramePayloadLength()); return new SocketIOServer(config); } /** * 开启SocketIOServer注解支持 * * @param socketServer * @return */ @Bean public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) { return new SpringAnnotationScanner(socketServer); } }
4. 实现服务端
import cn.hutool.core.util.StrUtil; import com.corundumstudio.socketio.SocketIOClient; import com.google.common.collect.Maps; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; import java.util.UUID; /** * 客户端缓存 * * @author: Fred * @email 453086@qq.com * @create: 2021-07-20 16:01 */ @Component public class ClientCache { /** * 本地缓存 */ private static Map<String, HashMap<UUID, SocketIOClient>> concurrentHashMap = Maps.newConcurrentMap(); /** * 存入本地缓存 * * @param mmsi 船舶MMSI * @param sessionId 页面sessionID * @param socketIOClient 页面对应的通道连接信息 */ public void saveClient(String mmsi, UUID sessionId, SocketIOClient socketIOClient) { if (StrUtil.isNotBlank(mmsi)) { HashMap<UUID, SocketIOClient> sessionIdClientCache = concurrentHashMap.get(mmsi); if (sessionIdClientCache == null) { sessionIdClientCache = new HashMap<>(); } sessionIdClientCache.put(sessionId, socketIOClient); concurrentHashMap.put(mmsi, sessionIdClientCache); } } /** * 根据用户ID获取所有通道信息 * * @param mmsi * @return */ public HashMap<UUID, SocketIOClient> getMmsiClient(String mmsi) { return concurrentHashMap.get(mmsi); } /** * 根据用户ID及页面sessionID删除页面链接信息 * * @param mmsi * @param sessionId */ public void deleteSessionClient(String mmsi, UUID sessionId) { concurrentHashMap.get(mmsi).remove(sessionId); } }
import cn.hutool.core.util.StrUtil; import com.corundumstudio.socketio.SocketIOClient; import com.corundumstudio.socketio.annotation.OnConnect; import com.corundumstudio.socketio.annotation.OnDisconnect; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.UUID; /** * 类描述 * * @author: Fred * @email 453086@qq.com * @create: 2021-07-20 16:33 */ @Slf4j @Component public class SocketConnection { @Resource private ClientCache clientCache; /** * 客户端连接 * * @param client */ @OnConnect public void onConnect(SocketIOClient client) { String mmsi = client.getHandshakeData().getSingleUrlParam("mmsi"); UUID sessionId = client.getSessionId(); clientCache.saveClient(mmsi, sessionId, client); log.info("客户端:" + mmsi + "|" + sessionId + "已连接"); } /** * 客户端断开 * * @param client */ @OnDisconnect public void onDisconnect(SocketIOClient client) { String mmsi = client.getHandshakeData().getSingleUrlParam("mmsi"); if (StrUtil.isNotBlank(mmsi)) { UUID sessionId = client.getSessionId(); clientCache.deleteSessionClient(mmsi, sessionId); log.info("客户端:" + mmsi + "|" + client.getSessionId() + "已离线"); } } }
import com.corundumstudio.socketio.SocketIOServer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; /** * Socket 服务器 * * @author: Fred * @email 453086@qq.com * @create: 2021-07-20 15:43 */ @Slf4j @Component @Order(1) public class SocketServer implements CommandLineRunner { /** * socketIOServer */ private final SocketIOServer socketIOServer; @Autowired public SocketServer(SocketIOServer socketIOServer) { this.socketIOServer = socketIOServer; } @Override public void run(String... args) { socketIOServer.start(); } }
6. 服务端发消息给客户端
public class SocketController { @Resource private ClientCache clientCache; @PostMapping("/test1") public void test() { HashMap<UUID, SocketIOClient> userClient = clientCache.getMmsiClient("2222"); userClient.forEach((uuid, socketIOClient) -> { //向客户端推送消息 socketIOClient.sendEvent("event", "服务端推送消息"); }); } }
————————————————————————————————————————————————————————————————————————————————————————
客户端
1. 添加依赖
<dependency> <groupId>io.socket</groupId> <artifactId>socket.io-client</artifactId> <version>1.0.0</version> </dependency>
2. 客户端监听
String url = "http://127.0.0.1:9093"; try { IO.Options options = new IO.Options(); options.transports = new String[]{"websocket"}; options.reconnectionAttempts = 2; // 失败重连的时间间隔 options.reconnectionDelay = 1000; // 连接超时时间(ms) options.timeout = 500; // mmsi: 唯一标识 传给服务端存储 final Socket socket = IO.socket(url + "?mmsi=2222", options); socket.on(Socket.EVENT_CONNECT, args1 -> socket.send("hello...")); // 自定义事件`connected` -> 接收服务端成功连接消息 socket.on("connected", objects -> log.debug("服务端:" + objects[0].toString())); // 自定义事件`push_data_event` -> 接收服务端消息 socket.on("push_data_event", objects -> log.debug("服务端:" + objects[0].toString())); // 自定义事件`myBroadcast` -> 接收服务端广播消息 socket.on("myBroadcast", objects -> log.debug("服务端:" + objects[0].toString())); socket.connect(); socket.emit("push_data_event", "发送数据 " + System.currentTimeMillis()); } catch (Exception e) { e.printStackTrace(); }