zoukankan      html  css  js  c++  java
  • influxDB 增加

    1、pom文件添加

    <parent>
    <artifactId>ma-base</artifactId>
    <groupId>com...</groupId>
    <version>5.0.0-SNAPSHOT</version>
    </parent>
    添加依赖
    <dependency>
    <groupId>org.influxdb</groupId>
    <artifactId>influxdb-java</artifactId>
    <version>2.15</version>
    </dependency>

    2、influxDB工具类
    package com.meicloud.mp.mas.core.influx;

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;

    @Slf4j
    @Component
    public class InfluxDBUtils implements InitializingBean {

    private static InfluxDBConnect influxDB;

    @Value("${common.mas.influx.url}")
    private String openurl = "http://127.0.0.1:8086";//连接地址

    @Value("${common.mas.influx.username}")
    private String username = "root";//用户名

    @Value("${common.mas.influx.password}")
    private String password = "root";//密码

    @Value("${common.mas.influx.database}")
    private String database = "polutondb";//数据库

    @Value("${common.mas.influx.data.keepTime:1}")
    private int dataKeepTime;

    private void setUp(){
    //创建 连接
    influxDB = new InfluxDBConnect(username, password, openurl, database);

    try {
    influxDB.influxDbBuild();
    } catch (Exception e) {
    throw new RuntimeException("init influxDB failed!", e);
    }

    if(influxDB != null) {
    influxDB.createRetentionPolicy(dataKeepTime);
    log.info("init influxDB successfully....");
    }else{
    throw new RuntimeException("init influxDB failed!");
    }
    }

    public InfluxDBConnect getConnect() {
    if(influxDB == null)
    {
    synchronized (InfluxDBUtils.class){
    if (influxDB == null){
    setUp();
    }
    }
    }
    return influxDB;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
    setUp();
    }
    }

    ps:基本配置可以走配置文件

    3、InfluxDBConnect 连接类
    package com.meicloud.mp.mas.core.influx;

    import org.influxdb.InfluxDB;
    import org.influxdb.InfluxDBFactory;
    import org.influxdb.dto.Point;
    import org.influxdb.dto.Point.Builder;
    import org.influxdb.dto.Query;
    import org.influxdb.dto.QueryResult;

    import java.util.Map;

    /**
    * 时序数据库 InfluxDB 连接
    */
    public class InfluxDBConnect {

    private String username;// 用户名
    private String password;// 密码
    private String openurl;// 连接地址
    private String database;// 数据库

    private InfluxDB influxDB;

    public InfluxDBConnect(String username, String password, String openurl,
    String database) {
    this.username = username;
    this.password = password;
    this.openurl = openurl;
    this.database = database;
    }

    /** 连接时序数据库;获得InfluxDB **/
    public InfluxDB influxDbBuild() {
    if (influxDB == null) {
    influxDB = InfluxDBFactory.connect(openurl, username, password);
    //influxDB.createDatabase(database);

    }
    return influxDB;
    }

    /**
    * 设置数据保存策略 defalut 策略名 /database 数据库名/ 30d 数据保存时限30天/ 1 副本个数为1/ 结尾DEFAULT
    * 表示 设为默认的策略
    */
    public void createRetentionPolicy() {
    String command = String
    .format("CREATE RETENTION POLICY "%s" ON "%s" DURATION %s REPLICATION %s DEFAULT",
    "defalut", database, "30d", 1);
    this.query(command);
    }

    /**
    * 查询
    *
    * @param command
    * 查询语句
    * @return
    */
    public QueryResult query(String command) {
    return influxDB.query(new Query(command, database));
    }

    /**
    * 插入
    *
    * @param measurement
    * 表
    * @param tags
    * 标签
    * @param fields
    * 字段
    */
    public void insert(String measurement, Map<String, String> tags,
    Map<String, Object> fields) {
    Builder builder = Point.measurement(measurement);
    builder.tag(tags);
    builder.fields(fields);

    influxDB.write(database, "", builder.build());
    }

    /**
    * 删除
    *
    * @param command
    * 删除语句
    * @return 返回错误信息
    */
    public String deleteMeasurementData(String command) {
    QueryResult result = influxDB.query(new Query(command, database));
    return result.getError();
    }

    /**
    * 创建数据库
    *
    * @param dbName
    */
    public void createDB(String dbName) {
    influxDB.createDatabase(dbName);
    }

    /**
    * 删除数据库
    *
    * @param dbName
    */
    public void deleteDB(String dbName) {
    influxDB.deleteDatabase(dbName);
    }

    public String getUsername() {
    return username;
    }

    public void setUsername(String username) {
    this.username = username;
    }

    public String getPassword() {
    return password;
    }

    public void setPassword(String password) {
    this.password = password;
    }

    public String getOpenurl() {
    return openurl;
    }

    public void setOpenurl(String openurl) {
    this.openurl = openurl;
    }

    public void setDatabase(String database) {
    this.database = database;
    }
    }
    4、InfluxCallErrorInfoService 接口类
    public interface InfluxCallErrorInfoService {

    boolean insert(List<CallErrorInfo> data);

    boolean insertOne(ErrorServerLog errorServerLog);

    List<CallErrorInfo> testQuery();

    List<CallErrorInfo> query(String alias, Long begin, Long end);

    int count(String alias, Long begin, Long end);

    int delete(String alias, Long begin, Long end);
    }

    4、InfluxCallErrorInfoService 接口类
    package com.meicloud.mp.mas.core.influx.service.impl;

    import com.meicloud.mp.mas.core.influx.InfluxDBConnect;
    import com.meicloud.mp.mas.core.influx.InfluxDBUtils;
    import com.meicloud.mp.mas.core.influx.bo.CallErrorInfo;
    import com.meicloud.mp.mas.core.influx.bo.ErrorServerLog;
    import com.meicloud.mp.mas.core.influx.bo.InfluxDbQueryResult;
    import com.meicloud.mp.mas.core.influx.service.InfluxCallErrorInfoService;
    import org.influxdb.dto.Point;
    import org.influxdb.dto.Query;
    import org.influxdb.dto.QueryResult;
    import org.influxdb.impl.InfluxDBResultMapper;
    import org.influxdb.querybuilder.BuiltQuery;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.util.CollectionUtils;

    import java.text.SimpleDateFormat;
    import java.util.*;

    /**
    *
    */
    @Service("influxCallErrorInfoService")
    public class InfluxCallErrorInfoServiceImpl implements InfluxCallErrorInfoService {
    private final static Logger logger = LoggerFactory.getLogger(InfluxCallErrorInfoServiceImpl.class);

    @Autowired
    InfluxDBUtils influxUtil;

    @Override
    public boolean insert(List<CallErrorInfo> list) {
    InfluxDBConnect influxDBConnect = influxUtil.getConnect();
    if(influxDBConnect==null) {
    logger.error("influxDBConnect get null, failed");
    return false;
    }

    Map<String, String> tags = new HashMap<>();
    Map<String, Object> fields = new HashMap<>();


    for (CallErrorInfo info : list) {
    tags.put(CallErrorInfo.Tag_InterfaceCode, info.getInterfacecode());
    tags.put(CallErrorInfo.Tag_Syscode, info.getSyscode());

    fields.put(CallErrorInfo.Field_Syscode, info.getSyscode());
    fields.put(CallErrorInfo.Field_InterfaceCode, info.getInterfacecode());
    fields.put(CallErrorInfo.Field_FailedTimeMill, (info.getFailedtimemill() + "i"));
    try {
    influxDBConnect.insert(CallErrorInfo.measurement, tags, fields);
    } catch (Exception e) {
    e.printStackTrace();
    }
    }

    return true;
    }

    @Override
    public boolean insertOne(ErrorServerLog errorServerLog) {
    Point point = Point.measurementByPOJO(errorServerLog.getClass()).addFieldsFromPOJO(errorServerLog).build();

    InfluxDBConnect influxDBConnect = influxUtil.getConnect();
    if(influxDBConnect==null) {
    logger.error("influxDBConnect get null, failed");
    return false;
    }
    try{
    influxDBConnect.insert(point);
    return true;
    }catch (Exception e){
    logger.error("insert into influxDB failed!", e);
    }
    return false;
    }

    @Override
    public List<CallErrorInfo> testQuery() {
    List<CallErrorInfo> lists = new ArrayList<>();

    InfluxDBConnect influxDBConnect = influxUtil.getConnect();
    if(influxDBConnect==null) {
    logger.error("influxDBConnect get null, failed");
    return lists;
    }

    String command = "select * from " + CallErrorInfo.measurement;
    QueryResult results = influxDBConnect.query(command);

    if (results.getResults() == null) {
    return lists;
    }
    for (QueryResult.Result result : results.getResults()) {
    List<QueryResult.Series> series = result.getSeries();
    if(series==null) {
    continue;
    }

    for (QueryResult.Series serie : series) {
    // Map<String, String> tags = serie.getTags();
    List<List<Object>> values = serie.getValues();
    List<String> columns = serie.getColumns();

    lists.addAll(CallErrorInfo.getQueryData(columns, values));
    }
    }

    return lists;
    }


    @Override
    public List<CallErrorInfo> query(String alias, Long begin, Long end) {
    List<CallErrorInfo> lists = new ArrayList<>();

    InfluxDBConnect influxDBConnect = influxUtil.getConnect();
    if(influxDBConnect==null) {
    logger.error("influxDBConnect get null, failed");
    return lists;
    }

    String command = "select * from " + CallErrorInfo.measurement + " where " + CallErrorInfo.Tag_InterfaceCode + "='" + alias + "'";
    if(begin!=null) {
    command += " and time>'" + timestampToTimez(begin) + "'";
    }
    if(end!=null) {
    command += " and time<'" + timestampToTimez(end) + "'";
    }
    QueryResult results = influxDBConnect.query(command);

    if (results.getResults() == null) {
    return lists;
    }
    for (QueryResult.Result result : results.getResults()) {

    List<QueryResult.Series> series = result.getSeries();
    if(series==null) {
    continue;
    }
    for (QueryResult.Series serie : series) {
    // Map<String, String> tags = serie.getTags();
    List<List<Object>> values = serie.getValues();
    List<String> columns = serie.getColumns();

    lists.addAll(CallErrorInfo.getQueryData(columns, values));
    }
    }

    return lists;
    }

    @Override
    public int count(String alias, Long begin, Long end) {
    InfluxDBConnect influxDBConnect = influxUtil.getConnect();
    if(influxDBConnect==null) {
    logger.error("influxDBConnect get null, failed");
    return -1;
    }
    Query query = BuiltQuery.QueryBuilder.select().count(CallErrorInfo.Field_InterfaceCode).as("count")
    .from("mas", CallErrorInfo.measurement)
    .where(BuiltQuery.QueryBuilder.eq(CallErrorInfo.Tag_InterfaceCode, alias))
    .and(BuiltQuery.QueryBuilder.gte(CallErrorInfo.Field_FailedTimeMill, begin))
    .and(BuiltQuery.QueryBuilder.lte(CallErrorInfo.Field_FailedTimeMill, end));
    QueryResult results = influxDBConnect.query(query);

    if (results.getResults() == null) {
    return 0;
    }
    QueryResult.Result result = results.getResults().get(0);
    if (!CollectionUtils.isEmpty(result.getSeries())){
    QueryResult.Series series = result.getSeries().get(0);
    if (null != series){
    List<String> columns = series.getColumns();
    List<List<Object>> values = series.getValues();
    if (!CollectionUtils.isEmpty(columns) && !CollectionUtils.isEmpty(values)){
    for (List<Object> listVal : values) {
    for (int i = 0; i < columns.size(); i++) {
    if ("count".equalsIgnoreCase(columns.get(i))) {
    String sCt = listVal.get(i).toString();
    Double ct = Double.parseDouble(sCt);
    return ct.intValue();
    }
    }
    }
    }
    }

    }
    return 0;
    }

    @Override
    public int delete(String alias, Long begin, Long end) {
    InfluxDBConnect influxDBConnect = influxUtil.getConnect();
    if(influxDBConnect==null) {
    logger.error("influxDBConnect get null, failed");
    return -1;
    }

    String command = "delete from " + CallErrorInfo.measurement + " where " + CallErrorInfo.Tag_InterfaceCode + "='" + alias + "'";
    if(begin!=null) {
    command += " and time>'" + timestampToTimez(begin) + "'";
    }
    if(end!=null) {
    command += " and time<'" + timestampToTimez(end) + "'";
    }
    String err = influxDBConnect.deleteMeasurementData(command);
    logger.info("delete :" + err);

    return 0;
    }


    public static final String DEFAULT_DATETIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'";
    /**
    * 转换timestamp到 influx的timez字符串格式
    *
    * @param timestamp
    * @return
    */
    static String timestampToTimez(Long timestamp)
    {
    Date date = new Date();
    date.setTime(timestamp);
    String sTime = new SimpleDateFormat(DEFAULT_DATETIME_FORMAT).format(date);
    return sTime;
    }

    public static void main(String[] argv) {

    Long timestamp = 1561289474000L;
    String sTime = timestampToTimez(timestamp);

    System.out.println(sTime);

    timestamp = 1561289474L;
    sTime = timestampToTimez(timestamp);

    System.out.println(sTime);
    }
    }
  • 相关阅读:
    第二次结对编程作业
    团队项目-需求分析报告
    团队项目-选题报告
    Git安装
    VI编辑,backspace无法删除解决方法
    VM虚拟机,Linux系统安装tools过程遇到 what is the location of the “ifconfig” program
    Ubuntu安装mysql
    Linux配置Tomcat
    Linux配置JDK
    鸡汤
  • 原文地址:https://www.cnblogs.com/fuqiang-terry/p/11120443.html
Copyright © 2011-2022 走看看