zoukankan      html  css  js  c++  java
  • java influx DB工具类

    配置

    application-properties:

    spring.influxdb.url=${influxdb_host:127.0.0.1}
    spring.influxdb.port=${influxdb_port:8086}
    spring.influxdb.username=${influxdb_username:root}
    spring.influxdb.password=${influxdb_password:root}
    spring.influxdb.database=${influxdb_database:test}

    InfluxDBConfig

    import com....utils.InfluxDBConnect;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    
    @ConfigurationProperties("application.properties")
    @Configuration
    public class InfluxDBConfig {
    
        @Value("${spring.influxdb.username}")
        private String username;
        @Value("${spring.influxdb.password}")
        private String password;
        @Value("${spring.influxdb.url}")
        private String url;
        @Value("${spring.influxdb.port}")
        private String port;
        @Value("${spring.influxdb.database}")
        private String database;
    
        @Bean
        public InfluxDBConnect getInfluxDBConnect(){
            InfluxDBConnect influxDB = new InfluxDBConnect(username, password, "http://"+url+":"+port, database);
    
            influxDB.influxDbBuild();
    
            influxDB.createRetentionPolicy();
            return influxDB;
        }
    
    }

    工具类

    influxDBConnect类:

    import lombok.Data;
    import org.influxdb.InfluxDB;
    import org.influxdb.InfluxDBFactory;
    import org.influxdb.dto.BatchPoints;
    import org.influxdb.dto.Point;
    import org.influxdb.dto.Query;
    import org.influxdb.dto.QueryResult;
    
    import java.util.Map;
    import java.util.concurrent.TimeUnit;
    
    
    
    @Data
    public class InfluxDBConnect {
        private String username;// 用户名
        private String password;// 密码
        private String openurl;// 连接地址
        private String database;// 数据库
    
        private InfluxDB influxDB;
    
    
    
        public InfluxDBConnect(String username, String password, String openurl, String database) {
            this.username = username;
            this.password = password;
            this.openurl = openurl;
            this.database = database;
        }
    
        /** 连接时序数据库;获得InfluxDB **/
        public InfluxDB influxDbBuild() {
            if (influxDB == null) {
                influxDB = InfluxDBFactory.connect(openurl, username, password);
                influxDB.createDatabase(database);
    
            }
            return influxDB;
        }
    
        /**
         * 设置数据保存策略 defalut 策略名 /database 数据库名/ 30d 数据保存时限30天/ 1 副本个数为1/ 结尾DEFAULT
         * 表示 设为默认的策略
         */
        public void createRetentionPolicy() {
            String command = String.format("CREATE RETENTION POLICY "%s" ON "%s" DURATION %s REPLICATION %s DEFAULT",
                    "defalut", database, "7200d", 1);
            this.query(command);
        }
    
        /**
         * 查询
         *
         * @param command 查询语句
         * @return
         */
        public QueryResult query(String command) {
            return influxDB.query(new Query(command, database));
        }
        public QueryResult query(String command, TimeUnit unit) {
            return influxDB.query(new Query(command, database),unit);
        }
    
        /**
         * 插入
         * @param measurement 表
         * @param tags 标签
         * @param fields 字段
         */
        public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields, long time) {
            Point.Builder builder = Point.measurement(measurement);
            builder.time(time, TimeUnit.MILLISECONDS);
            builder.tag(tags);
            builder.fields(fields);
            influxDB.write(database, "", builder.build());
        }
    
        /**
         * 插入
          * @param measurement 表
           * @param tags 标签
           * @param fields 字段
            */
         public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields){
             Point.Builder builder = Point.measurement(measurement);
             builder.tag(tags);
             builder.fields(fields);
    
             influxDB.write(database, "", builder.build());
         }
    
    
        /**
         * 插入
         * @param batchPoints 批量插入
         */
        public void batchInsert(BatchPoints batchPoints){
            influxDB.write(batchPoints);
        }
    
        /**
         * 删除
         * @param command 删除语句
         * @return 返回错误信息
         */
        public String deleteMeasurementData(String command) {
            QueryResult result = influxDB.query(new Query(command, database));
            return result.getError();
        }
    
        /**
         * 创建数据库
         * @param dbName
         */
        public void createDB(String dbName) {
            influxDB.createDatabase(dbName);
        }
    
        /**
         * 删除数据库
         *
         * @param dbName
         */
        public void deleteDB(String dbName) {
            influxDB.deleteDatabase(dbName);
        }
    
    }

     查询返回值包的层数很多,可以预处理

    import lombok.extern.slf4j.Slf4j;
    import org.influxdb.dto.QueryResult;
    
    import java.math.BigDecimal;
    import java.util.ArrayList;
    import java.util.LinkedHashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.TimeUnit;
    
    
    
    @Slf4j
    public class InfluxdbUtil {
    
        public static List<Object> queryInfluxdb(String query_sql, String table_type, InfluxDBConnect influxDB) {
            long starttime = System.currentTimeMillis();
            QueryResult result = influxDB.query(query_sql.toString(), TimeUnit.MILLISECONDS);
            long endtime = System.currentTimeMillis();
    
            List<Object> influx_data_list = getInfluxData(result.getResults().get(0), table_type);
    
            return influx_data_list;
    
        }
        public static List<Object> getInfluxData(QueryResult.Result result, String table_type) {
            List<Object> influx_data_list = null;
            if (null != result) {
                influx_data_list = new ArrayList<>();
    
                List<QueryResult.Series> series = result.getSeries();
                if (null != series && series.size() > 0) {
                    for (QueryResult.Series serie : series) {
                        List<List<Object>> values = serie.getValues();
    
                        List<Object> result_list = getInfluxDataAndBuild(values, serie, table_type);
    
                        influx_data_list.addAll(result_list);
                    }
                }
            }
    
            return influx_data_list;
        }
    
        public static  List<Object> getInfluxDataAndBuild(List<List<Object>> values, QueryResult.Series
                serie, String table_type) {
            List<Object> influx_data_list = new ArrayList<>();
            List<String> influx_cloumns = serie.getColumns();
            Map<String, String> influx_tags = serie.getTags();
    
            Map<String, Object> build_maps;
    
            if (values.size() > 0) {
    
                for (int i = 0; i < values.size(); i++) {
                    build_maps = new LinkedHashMap<>();
                    if (null != influx_tags) {
    
                        for (Map.Entry<String, String> entry : influx_tags.entrySet()) {
                            String entry_key = entry.getKey();
                            if (entry_key.contains("tag_")) {
                                entry_key = entry_key.substring(4);
                            }
                            build_maps.put(entry_key, entry.getValue());
                        }
                    }
                    Object[] influx_Obj = values.get(i).toArray();
    
                    for (int j = 0; j < influx_Obj.length; j++) {
                        String build_maps_key = influx_cloumns.get(j);
                        if (build_maps_key.equals("time")) {
                            if (table_type.equals("normal")) {
                                BigDecimal bd = new BigDecimal(String.valueOf(influx_Obj[j]));
                                build_maps.put("timestamp",  bd.toPlainString());
                            } else if (table_type.equals("polymerize")) {
                                //  build_maps.put("timestamp", DateUtil.dateToStampAndAddMinute(String.valueOf(influx_Obj[j])));
                            }
                        } else {
                            if (build_maps_key.contains("tag_")) {
                                build_maps_key = build_maps_key.substring(4);
                            }
    
                            build_maps.put(build_maps_key, influx_Obj[j]);
                        }
                    }
    
                    influx_data_list.add(build_maps);
                }
            }
            return influx_data_list;
        }
    }
  • 相关阅读:
    性能优化
    几种跨域处理
    重温前端基础之-数组去重
    移动端常遇到的问题
    WPF 应用
    WPF 应用
    WPF 应用
    C# 应用
    WPF 应用
    C# 应用
  • 原文地址:https://www.cnblogs.com/zhzhlong/p/11434716.html
Copyright © 2011-2022 走看看