zoukankan      html  css  js  c++  java
  • 基于Socket.IO的Client封装

    有了WebSocket的经验,这次写Socket.IOClient顺利了很多,参考之前的文章:socket接口开发和测试初探IntelliJ中基于文本的HTTP客户端基于WebSocket的client封装。之前的代码有更新,主要修复了一些BUG以及增加了一些功能方便在实际功能测试中使用,关于性能测试的,接下来还会在继续优化和多线程Socket接口的测试实践。

    本次与WebSocket区别在于多记录了一些监听event的名称,不知道会有啥用,我猜将来用于做收到消息的响应业务的话,应该会用到,所以用看了一个public ConcurrentSet<String> events = new ConcurrentSet<>();记录。

    关于send()方法,我并没有进行多个重载,测试代码中大家可以看到,我直接用的String类型的请求参数,然后转成JSON,打算后期直接把各种消息封装成不同的对象,所以只保留了一个send()方法。

        /**
         * 发送消息,暂不重载
         *
         * @param event
         * @param objects
         */
        public void send(String event, Object... objects) {
            events.add(event);
            this.socket.emit(event, objects);
        }
    

    依赖

    • Gradle
    // https://mvnrepository.com/artifact/io.socket/socket.io-client
    compile group: 'io.socket', name: 'socket.io-client', version: '1.0.0'
    
    
    • Maven
    <!-- https://mvnrepository.com/artifact/io.socket/socket.io-client -->
    <dependency>
        <groupId>io.socket</groupId>
        <artifactId>socket.io-client</artifactId>
        <version>1.0.0</version>
    </dependency>
    
    

    ScoketIOFunClient

    package com.fun.frame.socket;
    
    import com.fun.base.exception.FailException;
    import com.fun.config.SocketConstant;
    import com.fun.frame.SourceCode;
    import com.fun.utils.RString;
    import io.netty.util.internal.ConcurrentSet;
    import io.socket.client.IO;
    import io.socket.client.Socket;
    import io.socket.emitter.Emitter;
    import org.apache.commons.lang3.ArrayUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.net.URISyntaxException;
    import java.util.Arrays;
    import java.util.LinkedList;
    import java.util.Vector;
    
    /**
     * 基于Socket.IO的Client封装对象
     */
    public class ScoketIOFunClient extends SourceCode {
    
        private static Logger logger = LoggerFactory.getLogger(ScoketIOFunClient.class);
    
        public static IO.Options options = initOptions();
    
        public static Vector<ScoketIOFunClient> clients = new Vector<>();
    
        public LinkedList<String> msgs = new LinkedList<>();
    
        private String cname;
    
        private String url;
    
        public Socket socket;
    
        /**
         * 监听事件记录
         */
        public ConcurrentSet<String> events = new ConcurrentSet<>();
    
    
        private ScoketIOFunClient(String url, Socket socket) {
            this.url = url;
            this.socket = socket;
            clients.add(this);
        }
    
        /**
         * 获取socketClient实例
         *
         * @param url
         * @param cname
         * @return
         */
        public static ScoketIOFunClient getInstance(String url, String cname) {
            ScoketIOFunClient client = null;
            try {
                client = new ScoketIOFunClient(url, IO.socket(url, options));
                client.setCname(cname);
            } catch (URISyntaxException e) {
                FailException.fail();
            }
            return client;
        }
    
    
        /**
         * 初始化连接选项的方法,默认采取重置
         *
         * @return
         */
        public static IO.Options initOptions() {
            IO.Options options = new IO.Options();
            options.transports = SocketConstant.transports;
            //失败重试次数
            options.reconnectionAttempts = SocketConstant.MAX_RETRY;
            //失败重连的时间间隔
            options.reconnectionDelay = SocketConstant.RETRY_DELAY;
            //连接超时时间(ms)
            options.timeout = SocketConstant.TIMEOUT;
            return options;
        }
    
        /**
         * 注册通用的事件监听
         * {@link io.socket.client.Socket}
         */
        public void init() {
            this.socket.on(Socket.EVENT_CONNECTING, objects -> {
                logger.info("{} 正在连接...信息:{}", cname, initMsg(objects));
            });
            events.add(Socket.EVENT_CONNECTING);
            this.socket.on(Socket.EVENT_ERROR, objects -> {
                logger.info("{} 收到错误信息:{}", cname, initMsg(objects));
            });
            events.add(Socket.EVENT_ERROR);
            this.socket.on(Socket.EVENT_CONNECT_TIMEOUT, objects -> {
                logger.info("{} 连接超时!,url:{},信息:{}", cname, url, initMsg(objects));
            });
            events.add(Socket.EVENT_CONNECT_TIMEOUT);
            this.socket.on(Socket.EVENT_CONNECT_ERROR, objects -> {
                logger.info("{} 连接错误,信息:{}", cname, initMsg(objects));
            });
            events.add(Socket.EVENT_CONNECT_ERROR);
            /*此处统一的message做记录*/
            this.socket.on(Socket.EVENT_MESSAGE, objects -> {
                String msg = initMsg(objects);
                saveMsg(msg);
                logger.info("{} 收到消息事件,信息:{}", cname, msg);
            });
        }
    
        /**
         * 开始建立socket连接
         */
        public void connect() {
            this.socket.connect();
            logger.info("{} 开始连接...", cname);
            this.socket.connect();
            int a = 0;
            while (true) {
                if (this.socket.connected()) break;
                if ((a++ > SocketConstant.MAX_RETRY)) FailException.fail(cname + "连接重试失败!");
                SourceCode.sleep(SocketConstant.WAIT_INTERVAL);
            }
            logger.info("{} 连接成功!", cname);
        }
    
        /**
         * 添加监听事件
         *
         * @param event
         * @param fn
         */
        public void addEventListener(String event, Emitter.Listener fn) {
            events.add(event);
            this.socket.on(event, fn);
        }
    
        /**
         * 发送消息,暂不重载
         *
         * @param event
         * @param objects
         */
        public void send(String event, Object... objects) {
            events.add(event);
            this.socket.emit(event, objects);
        }
    
        /**
         * 关闭SocketClient
         */
        public void close() {
            logger.info("{} socket链接关闭!", cname);
            this.socket.close();
        }
    
        /**
         * 初始化收到的信息
         *
         * @param objects
         * @return
         */
        public static String initMsg(Object... objects) {
            if (ArrayUtils.isEmpty(objects)) return EMPTY;
            return Arrays.toString(objects);
        }
    
        /**
         * 该方法用于性能测试中,clone多线程对象
         *
         * @return
         */
        @Override
        public ScoketIOFunClient clone() {
            return getInstance(this.url, this.cname + RString.getString(4));
        }
    
        /**
         * 设置cname,多用于性能测试clone()之后
         *
         * @param cname
         */
        public void setCname(String cname) {
            this.cname = cname;
        }
    
        public String getCname() {
            return cname;
        }
    
        public String getUrl() {
            return url;
        }
    
        public void setUrl(String url) {
            this.url = url;
        }
    
        /**
         * 保存收到的信息,只保留最近的{@link SocketConstant}条
         *
         * @param msg
         */
        public void saveMsg(String msg) {
            synchronized (msgs) {
                if (msgs.size() > SocketConstant.MAX_MSG_SIZE) msgs.remove();
                msgs.add(msg);
            }
        }
    
        /**
         * 关闭所有socketclient
         */
        public static void closeAll() {
            clients.forEach(x ->
                    {
                        if (x != null && x.socket.connected()) x.close();
                    }
            );
            clients.clear();
            logger.info("关闭所有Socket客户端!");
        }
    
    
    }
    
    

    测试Demo

    这次我学乖了,先用Java语言趟一趟浑水。

    package com.fun.ztest.java;
    
    import com.alibaba.fastjson.JSON;
    import com.fun.frame.SourceCode;
    import com.fun.frame.socket.ScoketIOFunClient;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class Tdd extends SourceCode {
    
        private static Logger logger = LoggerFactory.getLogger(Tdd.class);
    
        public static void main(String[] args) throws InterruptedException {
            ScoketIOFunClient instance = ScoketIOFunClient.getInstance("http://ailearn-instruction-stress.xk12.cn:38899/?systemId=61951375269&loginType=3&token=4f99f5313c464070a40c709f72e8f72c&userType=1", DEFAULT_STRING);
    
            instance.connect();
            instance.addEventListener("my_response", objects -> {
                String s = ScoketIOFunClient.initMsg(objects);
                logger.info("{}收到my_response消息:{}", instance.getCname(), s);
            });
    
            String rege = "{"cmd": "register", "userId": 61951375269, "role": "T", "deviceVersion": "1.0", "s_sid": 123, "token": "4f99f5313c464070a40c709f72e8f72c"}";
            instance.send("my_event", JSON.parseObject(rege));
    //        instance.send("my_event", JSON.parseObject(rege));
            String ss = "{"cmd": "joinRoom", "roomId": 8888}";
            instance.send("my_event", JSON.parseObject(ss));
    
            sleep(10);
            instance.close();
        }
    
    
    }
    
    

    控制台输出

    INFO-> 当前用户:fv,IP:10.60.192.21,工作目录:/Users/fv/Documents/workspace/fun/,系统编码格式:UTF-8,系统Mac OS X版本:10.15.7
    INFO-> FunTester 开始连接...
    INFO-> FunTester 连接成功!
    INFO-> FunTester收到my_response消息:[{"msg":"","code":0,"data":{"role":"T","s_sid":123,"deviceVersion":"1.0","userId":61951375269,"token":"4f99f5313c464070a40c709f72e8f72c"},"cmd":"registerResponse"}]
    INFO-> FunTester收到my_response消息:[{"msg":"","code":0,"data":{"roomId":8888},"cmd":"joinRoomResponse"}]
    INFO-> FunTester socket链接关闭!
    
    Process finished with exit code 0
    
    

    FunTester腾讯云年度作者,优秀讲师 | 腾讯云+社区权威认证,非著名测试开发,欢迎关注。

    FunTester热文精选

  • 相关阅读:
    PowerDesigner如何设置字段为自增长
    Tab标签
    过滤数据集DataTable方法
    时间复杂度计算方法
    Oracle字符函数
    ASP.NET 应用程序生命周期概述
    在同一个DataSet中添加多个DataTable
    谈SQL SERVER数据库中的索引
    Abstract 与 Vitrual 用法
    活动图与流程图的区别
  • 原文地址:https://www.cnblogs.com/FunTester/p/14381608.html
Copyright © 2011-2022 走看看