zoukankan      html  css  js  c++  java
  • springmvc 整合 netty-socketio

    1 maven 

    <dependency>
    <groupId>com.corundumstudio.socketio</groupId>
    <artifactId>netty-socketio</artifactId>
    <version>1.7.12</version>
    </dependency>

    2 为了使服务运行启动需要实现 ApplicationListener 重写里面的方法 onApplicationEvent
    import com.corundumstudio.socketio.*;
    import com.corundumstudio.socketio.listener.ConnectListener;
    import com.corundumstudio.socketio.listener.DataListener;
    import com.corundumstudio.socketio.listener.DisconnectListener;
    import com.corundumstudio.socketio.listener.ExceptionListenerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import org.springframework.context.ApplicationListener;
    import org.springframework.context.event.ContextRefreshedEvent;
    import org.springframework.stereotype.Service;
    
    
    @Service
    public class SocketIoServer implements ApplicationListener<ContextRefreshedEvent> {
    
        private SocketIOServer server;
    
        @Override
        public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
            //端口
            int WSS_PORT=9001;
            //服务器ip
            String WSS_HOST="127.0.0.1";
    
            if( server== null) {
                Configuration config = new Configuration();
                //服务器ip
                config.setHostname(WSS_HOST);
                config.setPort(WSS_PORT);
                //该处进行身份验证h
                config.setAuthorizationListener(new AuthorizationListener() {
                    @Override
                    public boolean isAuthorized(HandshakeData handshakeData) {
                        //http://localhost:8081?username=test&password=test
                        //例如果使用上面的链接进行connect,可以使用如下代码获取用户密码信息
                        //String username = data.getSingleUrlParam("username");
                        //String password = data.getSingleUrlParam("password");
                        return true;
                    }
                });
                config.setExceptionListener(new ExceptionListenerAdapter() {
                    @Override
                    public boolean exceptionCaught(ChannelHandlerContext ctx, Throwable e) throws Exception {
                        System.out.println("错误:
    " + e.getMessage());
                        ctx.close();
                        return true;
                    }
                });
                server = new SocketIOServer(config);
                //添加链接事件监听
                server.addConnectListener(new ConnectListener() {
                    @Override
                    public void onConnect(SocketIOClient client) {
                        String clientId = client.getHandshakeData().getSingleUrlParam("clientid");
                        SocketIOClient si = ChatServerPool.getSocketIOClientByClientID(clientId); //这个客户端有没有连接过
                        // 如果没有连接信息、则新建会话信息
                        if (si == null) {
                            //在线数加1
                            //将会话信息更新保存至集合中
                            ChatServerPool.addUser(clientId, client);
    
                        }
                        //在线数减1
                        System.out.println("socket 连接、sessionId:" + client.getSessionId() + "、clientId:" +
                                clientId+",当前人数:"+ChatServerPool.onLineCount.get() );
                    }
                });
                //添加销毁链接事件监听
                server.addDisconnectListener(new DisconnectListener() {
                    @Override
                    public void onDisconnect(SocketIOClient client) {
                        String clientId = client.getHandshakeData().getSingleUrlParam("clientid");
                        ChatServerPool.removeUser(clientId);
                        //在线数减1
                        System.out.println("socket 断开连接、sessionId:" + client.getSessionId() + "、clientId:" +
                                clientId+",当前人数:"+ChatServerPool.onLineCount.get() );
    
                    }
                });
                //添加发送消息事件监听
                server.addEventListener("message_event", MessageInfo.class, new DataListener<MessageInfo>() {
                    @Override
                    public void onData(SocketIOClient client, MessageInfo data, AckRequest ackSender) throws Exception {
                        MessageInfo sendData = new MessageInfo();
                        sendData.setSourceClientId(data.getSourceClientId());
                        sendData.setTargetClientId(data.getTargetClientId());
                        sendData.setMsg(data.getMsg());
                        // 向当前会话发送信息
                        ChatServerPool.sendMessageToUserBySocketClient(client,"message_event",sendData.getMsg().toString());
                        // 向目标会话发送信息
                        ChatServerPool.sendMessageToUser(data.getTargetClientId(),"message_event",sendData.getMsg().toString());
                    }
    
                });
                //需要执行的逻辑代码,当spring容器初始化完成后就会执行该方法。
                server.start();
                System.out.println("start****************************server***"+WSS_PORT+"***********************end");
    
            }
        }
    
    }
    

      3 

    ChatServerPool.java
      
    import com.corundumstudio.socketio.SocketIOClient;
    import java.util.Collection;
    import java.util.Set;
    import java.util.concurrent.ConcurrentSkipListMap;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class ChatServerPool {
    
        //会话集合
        private static final ConcurrentSkipListMap<String, SocketIOClient> webSocketMap = new ConcurrentSkipListMap<>();
        //静态变量,用来记录当前在线连接数。(原子类、线程安全)
        public static AtomicInteger onLineCount = new AtomicInteger();
    
        /**
         * SocketIOClient
         */
        public static SocketIOClient getSocketIOClientByClientID(String clientID){
            SocketIOClient sc = webSocketMap.get(clientID);
            return sc;
        }
    
        /**
         * 向连接池中添加连接
         */
        public static void addUser(String clientID, SocketIOClient conn){
            if(conn !=null) {
                webSocketMap.put(clientID, conn);    //添加连接
                onLineCount.incrementAndGet();
            }
        }
    
        /**
         * 获取所有的在线用户
         * @return
         */
        public static Collection<String> getOnlineUser(){
            Set<String> setUsers = webSocketMap.keySet();
            return setUsers;
        }
    
        /**
         * 移除连接池中的连接
         */
        public static boolean removeUser(String clientID){
            if(webSocketMap.containsKey(clientID)){
                webSocketMap.remove(clientID);	//移除连接
                return true;
            }else{
                return false;
            }
        }
    
        /**
         * 向特定的用户发送数据
         */
        public static void sendMessageToUser(String clientId,String event,String msg){
            if(webSocketMap.containsKey(clientId) && !"".equals(msg)){
                webSocketMap.get(clientId).sendEvent(event, msg);
            }
        }
        /**
         * 向特定的用户发送数据
         */
        public static void sendMessageToUserBySocketClient(SocketIOClient conn,String event,String msg){
            if(conn !=null && !"".equals(msg)){
                conn.sendEvent(event, msg);
            }
        }
        /**
         * 向所有的用户发送消息
         * @param message
         */
        public static void sendMessageAll(String event,String message){
            Collection<SocketIOClient> cs = webSocketMap.values();
            synchronized (cs) {
                if(event !=null && !"".equals(event)){
                    for (SocketIOClient conn : cs) {
                        if(conn != null){
                            conn.sendEvent(event,message);
                        }
                    }
                }else{
                    for (SocketIOClient conn : cs) {
                        if(conn != null){
                            conn.sendEvent(message);
                        }
                    }
                }
    
            }
        }
    
    
    }
    

      4 MessageInfo.java

    public class MessageInfo {
        private String targetClientId ;
        private String sourceClientId;
        private Object msg ;
    
        public String getTargetClientId() {
            return targetClientId;
        }
    
        public void setTargetClientId(String targetClientId) {
            this.targetClientId = targetClientId;
        }
    
        public String getSourceClientId() {
            return sourceClientId;
        }
    
        public void setSourceClientId(String sourceClientId) {
            this.sourceClientId = sourceClientId;
        }
    
        public Object getMsg() {
            return msg;
        }
    
        public void setMsg(Object msg) {
            this.msg = msg;
        }
    }
    

      

    4 script 
    <script>
        var clientId='sys',targetId='sys001' ;
        var socket = io.connect('http://localhost:9001?clientid=sys');
    
        socket.on('connect', function () {
            showMsg(':<span class="connect-msg">成功连接到服务器!</span>');
        });
        socket.on('message_event', function (data) {
            showMsg('<br /><span class="username-msg">' + data.sourceClientId + ':</span> ' + data.msg);
        });
        socket.on('disconnect', function () {
            showMsg(':<span class="disconnect-msg">服务已断开!</span>');
        });
        function sendDisconnect() {
            socket.disconnect();
        }
        function sendMessage() {
            var message = $('#msg').val();
            $('#msg').val('');
            var jsonObject = {
                sourceClientId: clientId,
                targetClientId: targetId,
                msg: message
            };
            socket.emit('message_event', jsonObject);
        }
        function showMsg(message) {
            var currentTime = "<span class='time'>2019-01-01</span>";
            var element = $("<div>" + currentTime + "" + message + "</div>");
            $('#console').append(element);
        }
    
    
    </script>
    

      

  • 相关阅读:
    第四百零三节,python网站在线支付,支付宝接口集成与远程调试,
    第四百零二节,Django+Xadmin打造上线标准的在线教育平台—生产环境部署,uwsgi安装和启动,nginx的安装与启动,uwsgi与nginx的配置文件+虚拟主机配置
    第四百节,Django+Xadmin打造上线标准的在线教育平台—生产环境部署CentOS6.5安装python3.5.1
    第四百零一节,Django+Xadmin打造上线标准的在线教育平台—生产环境部署virtualenv虚拟环境安装,与Python虚拟环境批量安装模块
    中间件
    django自定义分页器组件
    Form组件
    前端之CSS重点知识
    group by 和where
    关于update和alter
  • 原文地址:https://www.cnblogs.com/syscn/p/11934929.html
Copyright © 2011-2022 走看看