zoukankan      html  css  js  c++  java
  • 通过监控RockeMQ消费者状态判断设备是否在线的样例

    ==背景==

     物联网场景,在设备端写了一个小的API服务程序,这个程序包括:

    1、向平台上报设备数据

    2、创建消费者客户端,用来监听平台的下行命令

    ==问题==

    平台层需要知道设备的状态:在线  or  离线。我能想到的解决办法

    1、设备上报心跳数据,平台通过心跳来判断设备是否在线。

    2、rocketmq应该有可以监控消费者状态的命令,是否可以通过这个命令实现。

    方案1肯定是没有问题的,不过缺点就是需要在平台上写状态管理的代码,麻烦不说,可能还有延迟。

    于是想尝试方法2是否可行。

    ==践行过程==

    首先,我观察了rocketmq-console(RocketMQ的Web界面,需要独立部署),发现可以通过Web界面查看消费者状态,结果如图:

    通过浏览器的控制台日志,可以看到调用的是consumerConnection.query接口。

    很好,我是否可以借鉴一下这个思路,去监听消费者状态呢。

    按照这个思路走,去github上找了源码:https://github.com/apache/rocketmq-externals

    通过查看他们的源码,才知道RocketMQ已经提供了供查看消费者链接信息的API。

    ==API示例==

    需要引入新的pom文件rocketmq-tools、rocketmq-common,增加只有,所有的pom为

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-store</artifactId>
        <version>4.5.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.5.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-acl</artifactId>
        <version>4.5.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-tools</artifactId>
        <version>4.5.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-common</artifactId>
        <version>4.5.0</version>
    </dependency>

    Java代码示例

    package admin;
    
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
    
    public class AdminExtSample {
        public static void main(String[] args)
            throws MQClientException, InterruptedException, MQBrokerException, RemotingException {
            DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
            defaultMQAdminExt.setNamesrvAddr("101.132.242.90:9876;47.116.50.192:9876");
            defaultMQAdminExt.start();
    
            ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo("device_cg_notice_down");
            System.out.println(cc.toString());
    
            defaultMQAdminExt.shutdown();
    
        }
    } 

    这样就可以获取上面web页面中的所有信息了。

    ==实际应用==

    在实际产品中,需要监控设备端消费者的状态,然后形成设备在线离线的结果数据。

    打算用Flink写一个实时流,监控消费者状态,并将结果写入到InfluxDB总。

    1、设计思路:

    →独立编写一个实时处理流(与数据处理的流分离开)

    →在Source中按照一定频率查看消费者状态(ConsumerConnection)

    →在Proc中读取InfluxDB中的状态(表名:device_status),与Source中产生的数据进行对比产生online、offline的设备状态变化数据

    →在Sink中将设备状态变化数据写入到InfluxDB(表名:device_status)

    2、实际代码

    类名:SourceConsumerConnection

    作用:RocketMQ的Source,用来定时检查消费者状态,产生状态的数据流

    Tips:当消费组不在线的时候,会抛出MQBrokerException(206)的异常,这个异常需要人工处理一下。

    package com.rexel.stream.flink.source;
    
    import com.alibaba.fastjson.JSONArray;
    import com.alibaba.fastjson.JSONObject;
    import com.rexel.stream.utils.RocketUtils;
    import java.util.HashSet;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.protocol.body.Connection;
    import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
    
    /**
     * @ClassName: SourceConsumerConnection
     * @Description: 无任何处理的Source
     * @Author: chunhui.qu@rexel.com.cn
     * @Date: 2020/7/28
     */
    @Slf4j
    public class SourceConsumerConnection extends RichParallelSourceFunction<String> {
        private RocketUtils rocketUtils = RocketUtils.getInstance();
        private DefaultMQAdminExt adminExt;
        private String namesrvAddr;
        private String consumerGroup;
        private String accessKey;
        private String secretKey;
    
        public SourceConsumerConnection(
            String namesrvAddr, String consumerGroup, String accessKey, String secretKey) {
            this.namesrvAddr = namesrvAddr;
            this.consumerGroup = consumerGroup;
            this.accessKey = accessKey;
            this.secretKey = secretKey;
        }
    
        @Override
        public void run(SourceContext<String> ctx) {
            adminExt = rocketUtils.createAdminExt(namesrvAddr, accessKey, secretKey);
    
            while (true) {
                ConsumerConnection cc;
    
                try {
                    cc = adminExt.examineConsumerConnectionInfo(consumerGroup);
                } catch (InterruptedException | RemotingException | MQClientException e) {
                    e.printStackTrace();
                    return;
                } catch (MQBrokerException e) {
                    // 消费者不在线
                    if (e.getResponseCode() == 206) {
                        ctx.collect(new JSONArray().toString());
                        continue;
                    } else {
                        e.printStackTrace();
                        return;
                    }
                }
    
                HashSet<Connection> set = cc.getConnectionSet();
                JSONArray jsonArray = new JSONArray();
                for(Connection connection : set) {
                    JSONObject jsonObject = (JSONObject)JSONObject.toJSON(connection);
                    jsonArray.add(jsonObject);
                }
                ctx.collect(jsonArray.toString());
    
                sleep();
            }
        }
    
        @Override
        public void close() {
            shutdown();
        }
    
        @Override
        public void cancel() {
            shutdown();
        }
    
        private void sleep() {
            try {
                Thread.sleep((long) 3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
        }
    
        private void shutdown() {
            if (adminExt != null) {
                adminExt.shutdown();
            }
            adminExt = null;
        }
    }

    类名:CheckDeviceStatus

    作用:接收Source数据,并读取InfluxDB数据,对比之后产生设备状态变化数据

    package com.rexel.stream.flink.proc;
    
    import com.alibaba.fastjson.JSONArray;
    import com.alibaba.fastjson.JSONObject;
    import com.rexel.stream.cons.Constants;
    import com.rexel.stream.utils.CommonUtils;
    import com.rexel.stream.utils.InfluxUtils;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.ProcessFunction;
    import org.apache.flink.util.Collector;
    import org.influxdb.InfluxDB;
    
    /**
     * @ClassName: CheckDeviceStatus
     * @Description: 设备状态检查
     * @Author: chunhui.qu@rexel.com.cn
     * @Date: 2020/7/28
     */
    @Slf4j
    public class CheckDeviceStatus extends ProcessFunction<String, String> {
        private InfluxUtils influxUtils = InfluxUtils.getInstance();
        private InfluxDB influxdb = null;
        private String database;
        private String url;
        private String username;
        private String password;
    
        public CheckDeviceStatus(String database, String url, String username, String password) {
            this.database = database;
            this.url = url;
            this.username = username;
            this.password = password;
        }
    
        @Override
        public void open(Configuration parameters) throws Exception {
            this.influxdb = this.influxUtils.connect(url, username, password);
            this.influxdb.setDatabase(database);
        }
    
        @Override
        public void processElement(String value, Context ctx, Collector<String> out) {
            /*
             * 输入数据1:消费者连接状态
             * [
             *     {
             *         "clientAddr":"223.72.217.178:20020",
             *         "clientId":"RexelLabDevice1@4e9c376c-eb28-4086-86e1-be6555d23430",
             *         "language":"JAVA",
             *         "version":313
             *     }
             * ]
             */
            JSONArray consumerArray = JSONArray.parseArray(value);
    
            /*
             * 输入数据2:设备最后一次状态
             * [
             *     {
             *         "time":"2020-07-21 16:08:14",
             *         "productKey":"a1B6t6ZG6oR",
             *         "deviceName":"QCHTestDevice1",
             *         "status":"offline"
             *     }
             * ]
             */
            JSONArray deviceArray = influxUtils.queryDeviceStatus(influxdb, database);
    
            // 检查已经登录过的设备
            for (int i = 0; i < deviceArray.size(); i++) {
                JSONObject deviceStatusJson = deviceArray.getJSONObject(i);
                String deviceName = deviceStatusJson.getString("deviceName");
                String productKey = deviceStatusJson.getString("productKey");
                String statusOld = deviceStatusJson.getString("status");
    
                // 获取设备消费者
                JSONObject deviceConsumerJson = getDeviceConsumer(deviceName, consumerArray);
    
                // 计算设备状态
                String statusNew = deviceConsumerJson == null ? Constants.OFFLINE : Constants.ONLINE;
    
                // 状态未发生变化
                if (statusOld.equals(statusNew)) {
                    continue;
                }
    
                // 生成设备状态JSON
                JSONObject resultJson = new JSONObject();
                resultJson.put("status", statusNew);
                resultJson.put("productKey", productKey);
                resultJson.put("deviceName", deviceName);
                resultJson.put("time", CommonUtils.timeLongToStr(System.currentTimeMillis()));
                out.collect(resultJson.toString());
            }
    
            // 检查新创建的设备
            for (int i = 0; i < consumerArray.size(); i++) {
                JSONObject consumerJson = consumerArray.getJSONObject(i);
                String clientId = consumerJson.getString("clientId");
                String deviceName = clientId.split("@")[0];
    
                // 检查是否为新设备
                if (!isNewDevice(deviceName, deviceArray)) {
                    continue;
                }
    
                // 生成设备状态JSON
                JSONObject resultJson = new JSONObject();
                resultJson.put("status", Constants.ONLINE);
                resultJson.put("productKey", "DView");
                resultJson.put("deviceName", deviceName);
                resultJson.put("time", CommonUtils.timeLongToStr(System.currentTimeMillis()));
                out.collect(resultJson.toString());
            }
        }
    
        private boolean isNewDevice(String deviceName, JSONArray deviceArray) {
            for (int i = 0; i < deviceArray.size(); i++) {
                JSONObject deviceStatusJson = deviceArray.getJSONObject(i);
                String deviceNameTemp = deviceStatusJson.getString("deviceName");
                if (deviceName.equals(deviceNameTemp)) {
                    return false;
                }
            }
            return true;
        }
    
        private JSONObject getDeviceConsumer(String deviceName, JSONArray consumerArray) {
            for (int i = 0; i < consumerArray.size(); i++) {
                JSONObject consumerJson = consumerArray.getJSONObject(i);
                String clientId = consumerJson.getString("clientId");
                if (clientId != null && clientId.startsWith(deviceName)) {
                    return consumerJson;
                }
            }
            return null;
        }
    }

    类名:SinkToInfluxStatus

    作用:将数据写入到InfluxDB中

    package com.rexel.stream.flink.sink;
    
    import com.alibaba.fastjson.JSON;
    import com.rexel.stream.pojo.DeviceStatus;
    import com.rexel.stream.utils.InfluxUtils;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    import org.influxdb.InfluxDB;
    
    /**
     * @ClassName: SinkToInfluxStatus
     * @Description: 设备状态数据Sink To InfluxDB
     * @Author: chunhui.qu@rexel.com.cn
     * @Date: 2020/7/28
     */
    @Slf4j
    public class SinkToInfluxStatus extends RichSinkFunction<String> {
        private InfluxUtils influxUtils = InfluxUtils.getInstance();
        private InfluxDB influxDb = null;
        private String database;
        private String url;
        private String username;
        private String password;
    
        public SinkToInfluxStatus(String url, String username, String password, String database) {
            this.url = url;
            this.username = username;
            this.password = password;
            this.database = database;
        }
    
        @Override
        public void open(Configuration parameters) throws Exception {
            this.influxDb = this.influxUtils.connect(url, username, password);
            this.influxDb.setDatabase(database);
        }
    
        @Override
        public void invoke(String value, Context context) {
            // 解析为DeviceBase对象
            DeviceStatus deviceStatus = JSON.parseObject(value, DeviceStatus.class);
    
            // 写入InfluxDB
            this.influxUtils.write(this.influxDb, this.database, deviceStatus);
            log.debug("[------]deviceStatus=" + deviceStatus.toString());
        }
    
        @Override
        public void close() {
            if (this.influxDb != null) {
                this.influxDb.close();
            }
            this.influxDb = null;
        }
    }

    类名:StreamDeviceStatus

    作用:Flink Job启动类

    Tips:启动参数需要设置一个--input的参数,这个参数将指定一个配置文件

    package com.rexel.stream.flink.job;
    
    import com.alibaba.fastjson.JSONObject;
    import com.rexel.stream.common.CheckPoint;
    import com.rexel.stream.cons.Constants;
    import com.rexel.stream.flink.proc.CheckDeviceStatus;
    import com.rexel.stream.flink.sink.SinkToInfluxStatus;
    import com.rexel.stream.flink.source.SourceConsumerConnection;
    import com.rexel.stream.utils.CommonUtils;
    import java.nio.charset.StandardCharsets;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.flink.api.common.JobExecutionResult;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.ProcessFunction;
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
    
    /**
     * @ClassName: StreamDeviceStatus
     * @Description: 监控设备状态实时流
     * @Author: chunhui.qu@rexel.com.cn
     * @Date: 2020/7/28
     */
    @Slf4j
    public class StreamDeviceStatus extends AbstractStream {
    
        /**
         * Job启动
         *
         * @param args 启动参数
         * @throws Exception e
         */
        public static void main(String[] args) throws Exception {
            // 参数检查
            final ParameterTool params = ParameterTool.fromArgs(args);
            if (!params.has(Constants.INPUT_CONF)) {
                log.error("[------]parameter error.");
                return;
            }
    
            // 读取配置文件
            String confStr = CommonUtils.readFile(params.get(Constants.INPUT_CONF), StandardCharsets.UTF_8);
            JSONObject confJson = JSONObject.parseObject(confStr);
            if (confJson == null) {
                log.error("[------]convert to json error.");
                return;
            }
    
            // 启动实时处理流
            StreamDeviceStatus streamDeviceStatus = new StreamDeviceStatus(confJson);
            streamDeviceStatus.execute();
        }
    
        /**
         * 构造函数
         *
         * @param confJson 配置文件JSON
         */
        private StreamDeviceStatus(JSONObject confJson) {
            super(confJson);
        }
    
        /**
         * 实时流主逻辑
         *
         * @throws Exception e
         */
        private void execute() throws Exception {
            // 创建执行流上下文
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 检查是否开启CheckPoint
            if (confJson.getBoolean(Constants.JOB_CHECK_POINT)) {
                CheckPoint.setCheckPoint(env);
            }
    
            // 定义数据源
            SingleOutputStreamOperator<String> rocketSource =
                env.addSource(getRocketSource()).name("rmq_consumer").setParallelism(1);
    
            // 定义处理过程
            SingleOutputStreamOperator<String> statusStream =
                rocketSource.process(getCheckDeviceStatus()).name("check_status").setParallelism(1);
    
            // 定义输出
            statusStream.addSink(getSinkInfluxStatus()).name("to_influx").setParallelism(1);
    
            // 作业流启动
            JobExecutionResult result = env.execute("check_device_status");
            log.info("[------]execute. result=" + result.toString());
        }
    
        /**
         * 创建RocketSource(设备运行数据)
         *
         * @return RichParallelSourceFunction
         */
        private RichParallelSourceFunction<String> getRocketSource() {
            String namesrvAddr = confJson.getString(Constants.RMQ_NAMESRV_ADDR);
            String accessKey = confJson.getString(Constants.RMQ_ACCESS_KEY);
            String secretKey = confJson.getString(Constants.RMQ_SECRET_KEY);
    
            String consumerGroup = confJson.getString(Constants.RMQ_CG_DEVICE_NOTICE_DOWN);
            return new SourceConsumerConnection(namesrvAddr, consumerGroup, accessKey, secretKey);
        }
    
        /**
         * 创建处理过程(告警检查)
         *
         * @return BroadcastProcessFunction
         */
        private ProcessFunction<String, String> getCheckDeviceStatus() {
            String url = confJson.getString(Constants.INFLUX_URL);
            String username = confJson.getString(Constants.INFLUX_USERNAME);
            String password = confJson.getString(Constants.INFLUX_PASSWORD);
            String database = confJson.getString(Constants.INFLUX_DATABASE);
            return new CheckDeviceStatus(database, url, username, password);
        }
    
        /**
         * 创建SinkToInflux(通用)
         *
         * @return RichSinkFunction
         */
        private RichSinkFunction<String> getSinkInfluxStatus() {
            String url = confJson.getString(Constants.INFLUX_URL);
            String username = confJson.getString(Constants.INFLUX_USERNAME);
            String password = confJson.getString(Constants.INFLUX_PASSWORD);
            String database = confJson.getString(Constants.INFLUX_DATABASE);
            return new SinkToInfluxStatus(url, username, password, database);
        }
    }

    类名:DeviceStatus

    作用:InfluxDB表名device_status的java映射文件

    package com.rexel.stream.pojo;
    
    import com.rexel.stream.cons.Constants;
    import com.rexel.stream.utils.CommonUtils;
    import lombok.EqualsAndHashCode;
    import lombok.Setter;
    import org.influxdb.dto.Point;
    
    import java.io.Serializable;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @ClassName: DeviceStatus
     * @Description: 设备上下线状态
     * @Author: chunhui.qu@rexel.com.cn
     * @Date: 2020/2/27
     */
    @EqualsAndHashCode(callSuper = true)
    @Setter
    public class DeviceStatus extends AbstractDeviceBase implements Serializable {
    
        /**
         * 设备状态。
         * online:上线。
         * offline:离线。
         */
        private String status;
        /**
         * 设备所属产品的唯一标识。
         */
        private String productKey;
        /**
         * 设备名称。
         */
        private String deviceName;
        /**
         * 发送通知的时间点。
         */
        private String time;
        /**
         * 发送通知的UTC时间点。
         */
        private String utcTime;
        /**
         * 状态变更前最后一次通信的时间。
         * 为避免消息时序紊乱造成影响,建议您根据lastTime来维护最终设备状态。
         */
        private String lastTime;
        /**
         * 状态变更前最后一次通信的UTC时间。
         */
        private String utcLastTime;
        /**
         * 设备公网出口IP。
         */
        private String clientIp;
    
        /**
         * 构造函数,初始化Measurement名称
         */
        public DeviceStatus() {
            setMeasurement(Constants.DEVICE_STATUS);
        }
    
        /**
         * 生成Point列表
         *
         * @return Point列表
         */
        @Override
        public List<Point> getPointList() {
            List<Point> pointList = new ArrayList<>();
    
            long longTime = CommonUtils.timeStrToLong(String.valueOf(time));
            Point.Builder builder = Point.measurement(getMeasurement());
            builder.time(longTime, TimeUnit.MILLISECONDS);
            addTag(builder, "productKey", productKey);
            addTag(builder, "deviceName", deviceName);
            addField(builder, "status", status);
            addField(builder, "utcTime", utcTime);
            addField(builder, "lastTime", lastTime);
            addField(builder, "utcLastTime", utcLastTime);
            addField(builder, "clientIp", clientIp);
            pointList.add(builder.build());
    
            return pointList;
        }
    }

    类名:AbstractDeviceBase

    作用:DeviceStatus的抽象父类,用来完成以下通用的处理(实际工程中还有好几个表的映射对象)

    package com.rexel.stream.pojo;
    
    import com.rexel.stream.cons.Constants;
    import com.rexel.stream.utils.CommonUtils;
    import lombok.Data;
    import org.influxdb.InfluxDB.ConsistencyLevel;
    import org.influxdb.dto.BatchPoints;
    import org.influxdb.dto.Point;
    
    import java.io.Serializable;
    import java.util.List;
    
    /**
     * @ClassName: DeviceBase
     * @Description: 物联网平台数据格式父类
     * @Author: chunhui.qu@rexel.com.cn
     * @Date: 2020/2/27
     * <p>
     * 以下是InfluxDB的一些使用原则。供参考
     * ■建议的原则:
     * 如果字段是经常被作为检索条件的元数据,设计为tag;
     * 如果字段经常要作为group by的参数,设计为tag;
     * 如果字段需要被用来放在influxQL的函数的参数,设计为field;
     * 如果出于某些考虑,字段不方便作为string类型保存,则应该使用field,因为tags总是以string类型来存储。
     * ■不建议的原则:
     * 不要有太多的series
     * 不要在measurement的名字里携带数据
     * 不要在一个tag字段里存放多于一条信息
     * 不要使用influxQL的关键字作为字段名
     */
    @Data
    abstract public class AbstractDeviceBase implements Serializable {
        private String measurement = Constants.DEVICE_OTHER;
    
        /**
         * 获取Point List
         *
         * @return Point List
         */
        abstract public List<Point> getPointList();
    
        /**
         * 生成BatchPoints
         *
         * @param dbName database
         * @return BatchPoints
         */
        public BatchPoints getBatchPoints(String dbName) {
            BatchPoints batchPoints = BatchPoints.database(
                    dbName).retentionPolicy(null).consistency(ConsistencyLevel.ALL).build();
    
            List<Point> pointList = getPointList();
            if (pointList == null || pointList.size() <= 0) {
                return null;
            }
            pointList.forEach(batchPoints::point);
    
            return batchPoints;
        }
    
        /**
         * 创建Tag
         *
         * @param builder Builder
         * @param name tag名
         * @param value tag值
         */
        void addTag(Point.Builder builder, String name, String value) {
            if (value != null) {
                builder.tag(name, value);
            }
        }
    
        /**
         * 创建Field
         *
         * @param builder Builder
         * @param name field名
         * @param value field值
         */
        void addField(Point.Builder builder, String name, Object value) {
            if (value == null) {
                return;
            }
    
            if (CommonUtils.isNumber(value)) {
                builder.addField(name, CommonUtils.formatDouble(value));
            } else {
                builder.addField(name, String.valueOf(value));
            }
        }
    }

    类名:InfluxUtils

    作用:InfluxDB的通用处理类

    package com.rexel.stream.utils;
    
    import com.alibaba.fastjson.JSONArray;
    import com.alibaba.fastjson.JSONObject;
    import com.rexel.stream.pojo.AbstractDeviceBase;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    import lombok.extern.slf4j.Slf4j;
    import okhttp3.OkHttpClient;
    import org.influxdb.InfluxDB;
    import org.influxdb.InfluxDBFactory;
    import org.influxdb.dto.BatchPoints;
    
    import java.io.Serializable;
    import org.influxdb.dto.Query;
    import org.influxdb.dto.QueryResult;
    import org.influxdb.dto.QueryResult.Result;
    import org.influxdb.dto.QueryResult.Series;
    
    /**
     * @ClassName: InfluxUtils
     * @Description: InfluxDB通用类
     * @Author: chunhui.qu@rexel.com.cn
     * @Date: 2020/2/27
     */
    @Slf4j
    public class InfluxUtils implements Serializable {
        /**
         * 构造函数
         */
        private InfluxUtils() {
        }
    
        /**
         * 单例模式
         */
        private static class SingletonInstance {
            private static final InfluxUtils INSTANCE = new InfluxUtils();
        }
    
        /**
         * 获取实例句柄
         */
        public static InfluxUtils getInstance() {
            return SingletonInstance.INSTANCE;
        }
    
        /**
         * InfluxDB连接
         *
         * @param url URL
         * @param userName 用户名
         * @param password 密码
         * @return InfluxDB句柄
         * @throws Exception e
         */
        public InfluxDB connect(String url, String userName, String password) throws Exception {
            OkHttpClient.Builder client =
                new OkHttpClient.Builder().readTimeout(60, TimeUnit.SECONDS);
            InfluxDB influxDb = InfluxDBFactory.connect(url, userName, password, client);
            if (influxDb == null) {
                throw new Exception("[---InfluxDBSink---]influxDB == null");
            }
            return influxDb;
        }
    
        /**
         * InfluxDB写入
         *
         * @param influxDb InfluxDB句柄
         * @param dbName database
         * @param abstractDeviceBase 写入对象
         */
        public void write(InfluxDB influxDb, String dbName, AbstractDeviceBase abstractDeviceBase) {
            BatchPoints batchPoints = abstractDeviceBase.getBatchPoints(dbName);
            if (batchPoints == null) {
                log.error("[------]batchPoints == null");
                return;
            }
    
            if (batchPoints.getPoints().size() <= 0) {
                log.error("[------]point size == 0");
                return;
            }
    
            influxDb.write(batchPoints);
            log.debug("[------]batchPoints=" + batchPoints.toString());
        }
    
        /**
         * 检索设备状态
         *
         * @param influxDb InfluxDB句柄
         * @param database database
         * @return JSONArray
         */
        public JSONArray queryDeviceStatus(InfluxDB influxDb, String database) {
            String sql = " select "
                + " last(status) as status, "
                + " productKey, "
                + " deviceName "
                + " from device_status "
                + " group by productKey, deviceName "
                + " tz('Asia/Shanghai') ";
    
            QueryResult queryResult = influxDb.query(new Query(sql, database));
            return convert(queryResult);
        }
    
        /**
         * 转换QueryResult
         *
         * @param queryResult QueryResult
         * @return JSONArray
         */
        private JSONArray convert(QueryResult queryResult) {
            JSONArray jsonArray = new JSONArray();
            List<Result> results = queryResult.getResults();
            for (Result result : results) {
                List<Series> seriesList = result.getSeries();
                if (seriesList == null) {
                    continue;
                }
                for (Series series : seriesList) {
                    List<List<Object>> valuesList = series.getValues();
                    if (valuesList == null) {
                        continue;
                    }
                    for (List<Object> values : valuesList) {
                        List<String> columns = series.getColumns();
                        JSONObject jsonObject = new JSONObject();
                        for (int i = 0; i < columns.size(); i++) {
                            String column = columns.get(i);
                            Object value = values.get(i);
                            if (value != null) {
                                jsonObject.put(column, value);
                            }
                        }
                        jsonArray.add(jsonObject);
                    }
                }
            }
            return jsonArray;
        }
    }

    类名:RocketUtils

    作用:RocketMQ的通用处理类

    package com.rexel.stream.utils;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    import java.io.Serializable;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.UUID;
    import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
    
    /**
     * @ClassName: RocketUtils
     * @Description: RocketMQ通用类
     * @Author: chunhui.qu@rexel.com.cn
     * @Date: 2020/2/27
     */
    @Slf4j
    public class RocketUtils implements Serializable {
        private static Map<String, DefaultMQProducer> producerMap = new HashMap<>();
    
        /**
         * 构造函数
         */
        private RocketUtils() {
        }
    
        /**
         * 单例模式
         */
        private static class SingletonInstance {
            private static final RocketUtils INSTANCE = new RocketUtils();
        }
    
        /**
         * 获取实例句柄
         */
        public static RocketUtils getInstance() {
            return SingletonInstance.INSTANCE;
        }
    
        /**
         * 创建DefaultMQAdminExt
         *
         * @param namesrvAddr namesrvAddr
         * @param accessKey accessKey
         * @param secretKey secretKey
         * @return DefaultMQAdminExt
         */
        public DefaultMQAdminExt createAdminExt(String namesrvAddr, String accessKey, String secretKey) {
            RPCHook rpcHook = getAclRpcHook(accessKey, secretKey);
            DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook, 300000);
            adminExt.setNamesrvAddr(namesrvAddr);
            adminExt.setAdminExtGroup(UUID.randomUUID().toString());
            try {
                adminExt.start();
            } catch (MQClientException e) {
                e.printStackTrace();
                return null;
            }
            return adminExt;
        }
    
        /**
         * 创建消费者
         *
         * @param namesrvAddr namesrvAddr
         * @param topic topic
         * @param group 消费组
         * @param accessKey accessKey
         * @param secretKey secretKey
         * @return 消费者句柄
         */
        public DefaultMQPushConsumer createConsumer(
            String namesrvAddr, String topic, String group, String accessKey, String secretKey) {
            RPCHook rpcHook = getAclRpcHook(accessKey, secretKey);
            DefaultMQPushConsumer consumer =
                new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely());
            consumer.setNamesrvAddr(namesrvAddr);
            consumer.setInstanceName(UUID.randomUUID().toString());
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.setMessageModel(MessageModel.CLUSTERING);
            consumer.setVipChannelEnabled(false);
            consumer.setConsumeTimeout(180000);
            consumer.setConsumeThreadMax(1);
            consumer.setConsumeThreadMin(1);
            consumer.setConsumeMessageBatchMaxSize(1);
            try {
                consumer.subscribe(topic, "*");
            } catch (MQClientException e) {
                e.printStackTrace();
                return null;
            }
            return consumer;
        }
    
        /**
         * 创建生产者
         *
         * @param nameSrvAddr NamesrvAddr
         * @param group 生产组
         * @param accessKey accessKey
         * @param secretKey secretKey
         * @return 生产者句柄
         */
        public DefaultMQProducer createProducer(
            String nameSrvAddr, String group, String accessKey, String secretKey) {
            String key = nameSrvAddr + group;
            if (producerMap.containsKey(key)) {
                return producerMap.get(key);
            }
    
            RPCHook rpcHook = getAclRpcHook(accessKey, secretKey);
            DefaultMQProducer producer = new DefaultMQProducer(group, rpcHook);
            producer.setNamesrvAddr(nameSrvAddr);
            producer.setSendMessageWithVIPChannel(false);
            producer.setSendMsgTimeout(5000);
            producer.setInstanceName(UUID.randomUUID().toString());
            try {
                producer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
                return null;
            }
    
            producerMap.put(key, producer);
            return producer;
        }
    
        /**
         * 生产数据(异步)
         *
         * @param producer 生产者句柄
         * @param msg 消息
         * @param callback 回调函数
         */
        public void sendAsync(DefaultMQProducer producer, Message msg, SendCallback callback) {
            try {
                producer.send(msg, callback);
            } catch (MQClientException | RemotingException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 生产数据(同步)
         *
         * @param producer 生产者句柄
         * @param msg 消息
         * @return 发送结果
         */
        public SendResult send(DefaultMQProducer producer, Message msg) {
            try {
                return producer.send(msg);
            } catch (MQClientException | RemotingException | InterruptedException | MQBrokerException e) {
                e.printStackTrace();
                return null;
            }
        }
    
        /**
         * 设置ACL权限
         *
         * @param accessKey accessKey
         * @param secretKey secretKey
         * @return RPCHook
         */
        private RPCHook getAclRpcHook(String accessKey, String secretKey) {
            return new AclClientRPCHook(new SessionCredentials(accessKey,secretKey));
        }
    }

    启动文件:

    {
      "job.name": "xxx",
      "job.checkpoint": false,
      "influxdb.url": "http://xxx:8100",
      "influxdb.username": "xxx",
      "influxdb.password": "xxx",
      "influxdb.database": "xxx",
      "rocketmq.namesrvAddr": "xxx",
      "rocketmq.acl.accessKey": "xxx",
      "rocketmq.acl.secretKey": "xxx",
      "rocketmq.device.consumerGroup.notice.down": "xxx"
    }

    3、处理结果

    以下是InfluxDB中的结果数据。

    --END--

  • 相关阅读:
    JAVA对象之生
    单表扫描,MySQL索引选择不正确 并 详细解析OPTIMIZER_TRACE格式
    [MySQL 5.6] 初识5.6的optimizer trace
    [MySQL5.6] 一个简单的optimizer_trace示例
    PERFORMANCE_SCHEMA 详解
    MetaData Lock 杨奇龙 ---MYSQL博客专家
    ArcEngine控制台应用程序
    一位数据科学家的私房工具清单
    数据可视化之热力图
    数据可视化之MarkPoint
  • 原文地址:https://www.cnblogs.com/quchunhui/p/13371274.html
Copyright © 2011-2022 走看看