zoukankan      html  css  js  c++  java
  • influxDB 增加

    1、pom文件添加

    <parent>
    <artifactId>ma-base</artifactId>
    <groupId>com...</groupId>
    <version>5.0.0-SNAPSHOT</version>
    </parent>
    添加依赖
    <dependency>
    <groupId>org.influxdb</groupId>
    <artifactId>influxdb-java</artifactId>
    <version>2.15</version>
    </dependency>

    2、influxDB工具类
    package com.meicloud.mp.mas.core.influx;

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;

    @Slf4j
    @Component
    public class InfluxDBUtils implements InitializingBean {

    private static InfluxDBConnect influxDB;

    @Value("${common.mas.influx.url}")
    private String openurl = "http://127.0.0.1:8086";//连接地址

    @Value("${common.mas.influx.username}")
    private String username = "root";//用户名

    @Value("${common.mas.influx.password}")
    private String password = "root";//密码

    @Value("${common.mas.influx.database}")
    private String database = "polutondb";//数据库

    @Value("${common.mas.influx.data.keepTime:1}")
    private int dataKeepTime;

    private void setUp(){
    //创建 连接
    influxDB = new InfluxDBConnect(username, password, openurl, database);

    try {
    influxDB.influxDbBuild();
    } catch (Exception e) {
    throw new RuntimeException("init influxDB failed!", e);
    }

    if(influxDB != null) {
    influxDB.createRetentionPolicy(dataKeepTime);
    log.info("init influxDB successfully....");
    }else{
    throw new RuntimeException("init influxDB failed!");
    }
    }

    public InfluxDBConnect getConnect() {
    if(influxDB == null)
    {
    synchronized (InfluxDBUtils.class){
    if (influxDB == null){
    setUp();
    }
    }
    }
    return influxDB;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
    setUp();
    }
    }

    ps:基本配置可以走配置文件

    3、InfluxDBConnect 连接类
    package com.meicloud.mp.mas.core.influx;

    import org.influxdb.InfluxDB;
    import org.influxdb.InfluxDBFactory;
    import org.influxdb.dto.Point;
    import org.influxdb.dto.Point.Builder;
    import org.influxdb.dto.Query;
    import org.influxdb.dto.QueryResult;

    import java.util.Map;

    /**
    * 时序数据库 InfluxDB 连接
    */
    public class InfluxDBConnect {

    private String username;// 用户名
    private String password;// 密码
    private String openurl;// 连接地址
    private String database;// 数据库

    private InfluxDB influxDB;

    public InfluxDBConnect(String username, String password, String openurl,
    String database) {
    this.username = username;
    this.password = password;
    this.openurl = openurl;
    this.database = database;
    }

    /** 连接时序数据库;获得InfluxDB **/
    public InfluxDB influxDbBuild() {
    if (influxDB == null) {
    influxDB = InfluxDBFactory.connect(openurl, username, password);
    //influxDB.createDatabase(database);

    }
    return influxDB;
    }

    /**
    * 设置数据保存策略 defalut 策略名 /database 数据库名/ 30d 数据保存时限30天/ 1 副本个数为1/ 结尾DEFAULT
    * 表示 设为默认的策略
    */
    public void createRetentionPolicy() {
    String command = String
    .format("CREATE RETENTION POLICY "%s" ON "%s" DURATION %s REPLICATION %s DEFAULT",
    "defalut", database, "30d", 1);
    this.query(command);
    }

    /**
    * 查询
    *
    * @param command
    * 查询语句
    * @return
    */
    public QueryResult query(String command) {
    return influxDB.query(new Query(command, database));
    }

    /**
    * 插入
    *
    * @param measurement
    * 表
    * @param tags
    * 标签
    * @param fields
    * 字段
    */
    public void insert(String measurement, Map<String, String> tags,
    Map<String, Object> fields) {
    Builder builder = Point.measurement(measurement);
    builder.tag(tags);
    builder.fields(fields);

    influxDB.write(database, "", builder.build());
    }

    /**
    * 删除
    *
    * @param command
    * 删除语句
    * @return 返回错误信息
    */
    public String deleteMeasurementData(String command) {
    QueryResult result = influxDB.query(new Query(command, database));
    return result.getError();
    }

    /**
    * 创建数据库
    *
    * @param dbName
    */
    public void createDB(String dbName) {
    influxDB.createDatabase(dbName);
    }

    /**
    * 删除数据库
    *
    * @param dbName
    */
    public void deleteDB(String dbName) {
    influxDB.deleteDatabase(dbName);
    }

    public String getUsername() {
    return username;
    }

    public void setUsername(String username) {
    this.username = username;
    }

    public String getPassword() {
    return password;
    }

    public void setPassword(String password) {
    this.password = password;
    }

    public String getOpenurl() {
    return openurl;
    }

    public void setOpenurl(String openurl) {
    this.openurl = openurl;
    }

    public void setDatabase(String database) {
    this.database = database;
    }
    }
    4、InfluxCallErrorInfoService 接口类
    public interface InfluxCallErrorInfoService {

    boolean insert(List<CallErrorInfo> data);

    boolean insertOne(ErrorServerLog errorServerLog);

    List<CallErrorInfo> testQuery();

    List<CallErrorInfo> query(String alias, Long begin, Long end);

    int count(String alias, Long begin, Long end);

    int delete(String alias, Long begin, Long end);
    }

    4、InfluxCallErrorInfoService 接口类
    package com.meicloud.mp.mas.core.influx.service.impl;

    import com.meicloud.mp.mas.core.influx.InfluxDBConnect;
    import com.meicloud.mp.mas.core.influx.InfluxDBUtils;
    import com.meicloud.mp.mas.core.influx.bo.CallErrorInfo;
    import com.meicloud.mp.mas.core.influx.bo.ErrorServerLog;
    import com.meicloud.mp.mas.core.influx.bo.InfluxDbQueryResult;
    import com.meicloud.mp.mas.core.influx.service.InfluxCallErrorInfoService;
    import org.influxdb.dto.Point;
    import org.influxdb.dto.Query;
    import org.influxdb.dto.QueryResult;
    import org.influxdb.impl.InfluxDBResultMapper;
    import org.influxdb.querybuilder.BuiltQuery;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.util.CollectionUtils;

    import java.text.SimpleDateFormat;
    import java.util.*;

    /**
    *
    */
    @Service("influxCallErrorInfoService")
    public class InfluxCallErrorInfoServiceImpl implements InfluxCallErrorInfoService {
    private final static Logger logger = LoggerFactory.getLogger(InfluxCallErrorInfoServiceImpl.class);

    @Autowired
    InfluxDBUtils influxUtil;

    @Override
    public boolean insert(List<CallErrorInfo> list) {
    InfluxDBConnect influxDBConnect = influxUtil.getConnect();
    if(influxDBConnect==null) {
    logger.error("influxDBConnect get null, failed");
    return false;
    }

    Map<String, String> tags = new HashMap<>();
    Map<String, Object> fields = new HashMap<>();


    for (CallErrorInfo info : list) {
    tags.put(CallErrorInfo.Tag_InterfaceCode, info.getInterfacecode());
    tags.put(CallErrorInfo.Tag_Syscode, info.getSyscode());

    fields.put(CallErrorInfo.Field_Syscode, info.getSyscode());
    fields.put(CallErrorInfo.Field_InterfaceCode, info.getInterfacecode());
    fields.put(CallErrorInfo.Field_FailedTimeMill, (info.getFailedtimemill() + "i"));
    try {
    influxDBConnect.insert(CallErrorInfo.measurement, tags, fields);
    } catch (Exception e) {
    e.printStackTrace();
    }
    }

    return true;
    }

    @Override
    public boolean insertOne(ErrorServerLog errorServerLog) {
    Point point = Point.measurementByPOJO(errorServerLog.getClass()).addFieldsFromPOJO(errorServerLog).build();

    InfluxDBConnect influxDBConnect = influxUtil.getConnect();
    if(influxDBConnect==null) {
    logger.error("influxDBConnect get null, failed");
    return false;
    }
    try{
    influxDBConnect.insert(point);
    return true;
    }catch (Exception e){
    logger.error("insert into influxDB failed!", e);
    }
    return false;
    }

    @Override
    public List<CallErrorInfo> testQuery() {
    List<CallErrorInfo> lists = new ArrayList<>();

    InfluxDBConnect influxDBConnect = influxUtil.getConnect();
    if(influxDBConnect==null) {
    logger.error("influxDBConnect get null, failed");
    return lists;
    }

    String command = "select * from " + CallErrorInfo.measurement;
    QueryResult results = influxDBConnect.query(command);

    if (results.getResults() == null) {
    return lists;
    }
    for (QueryResult.Result result : results.getResults()) {
    List<QueryResult.Series> series = result.getSeries();
    if(series==null) {
    continue;
    }

    for (QueryResult.Series serie : series) {
    // Map<String, String> tags = serie.getTags();
    List<List<Object>> values = serie.getValues();
    List<String> columns = serie.getColumns();

    lists.addAll(CallErrorInfo.getQueryData(columns, values));
    }
    }

    return lists;
    }


    @Override
    public List<CallErrorInfo> query(String alias, Long begin, Long end) {
    List<CallErrorInfo> lists = new ArrayList<>();

    InfluxDBConnect influxDBConnect = influxUtil.getConnect();
    if(influxDBConnect==null) {
    logger.error("influxDBConnect get null, failed");
    return lists;
    }

    String command = "select * from " + CallErrorInfo.measurement + " where " + CallErrorInfo.Tag_InterfaceCode + "='" + alias + "'";
    if(begin!=null) {
    command += " and time>'" + timestampToTimez(begin) + "'";
    }
    if(end!=null) {
    command += " and time<'" + timestampToTimez(end) + "'";
    }
    QueryResult results = influxDBConnect.query(command);

    if (results.getResults() == null) {
    return lists;
    }
    for (QueryResult.Result result : results.getResults()) {

    List<QueryResult.Series> series = result.getSeries();
    if(series==null) {
    continue;
    }
    for (QueryResult.Series serie : series) {
    // Map<String, String> tags = serie.getTags();
    List<List<Object>> values = serie.getValues();
    List<String> columns = serie.getColumns();

    lists.addAll(CallErrorInfo.getQueryData(columns, values));
    }
    }

    return lists;
    }

    @Override
    public int count(String alias, Long begin, Long end) {
    InfluxDBConnect influxDBConnect = influxUtil.getConnect();
    if(influxDBConnect==null) {
    logger.error("influxDBConnect get null, failed");
    return -1;
    }
    Query query = BuiltQuery.QueryBuilder.select().count(CallErrorInfo.Field_InterfaceCode).as("count")
    .from("mas", CallErrorInfo.measurement)
    .where(BuiltQuery.QueryBuilder.eq(CallErrorInfo.Tag_InterfaceCode, alias))
    .and(BuiltQuery.QueryBuilder.gte(CallErrorInfo.Field_FailedTimeMill, begin))
    .and(BuiltQuery.QueryBuilder.lte(CallErrorInfo.Field_FailedTimeMill, end));
    QueryResult results = influxDBConnect.query(query);

    if (results.getResults() == null) {
    return 0;
    }
    QueryResult.Result result = results.getResults().get(0);
    if (!CollectionUtils.isEmpty(result.getSeries())){
    QueryResult.Series series = result.getSeries().get(0);
    if (null != series){
    List<String> columns = series.getColumns();
    List<List<Object>> values = series.getValues();
    if (!CollectionUtils.isEmpty(columns) && !CollectionUtils.isEmpty(values)){
    for (List<Object> listVal : values) {
    for (int i = 0; i < columns.size(); i++) {
    if ("count".equalsIgnoreCase(columns.get(i))) {
    String sCt = listVal.get(i).toString();
    Double ct = Double.parseDouble(sCt);
    return ct.intValue();
    }
    }
    }
    }
    }

    }
    return 0;
    }

    @Override
    public int delete(String alias, Long begin, Long end) {
    InfluxDBConnect influxDBConnect = influxUtil.getConnect();
    if(influxDBConnect==null) {
    logger.error("influxDBConnect get null, failed");
    return -1;
    }

    String command = "delete from " + CallErrorInfo.measurement + " where " + CallErrorInfo.Tag_InterfaceCode + "='" + alias + "'";
    if(begin!=null) {
    command += " and time>'" + timestampToTimez(begin) + "'";
    }
    if(end!=null) {
    command += " and time<'" + timestampToTimez(end) + "'";
    }
    String err = influxDBConnect.deleteMeasurementData(command);
    logger.info("delete :" + err);

    return 0;
    }


    public static final String DEFAULT_DATETIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'";
    /**
    * 转换timestamp到 influx的timez字符串格式
    *
    * @param timestamp
    * @return
    */
    static String timestampToTimez(Long timestamp)
    {
    Date date = new Date();
    date.setTime(timestamp);
    String sTime = new SimpleDateFormat(DEFAULT_DATETIME_FORMAT).format(date);
    return sTime;
    }

    public static void main(String[] argv) {

    Long timestamp = 1561289474000L;
    String sTime = timestampToTimez(timestamp);

    System.out.println(sTime);

    timestamp = 1561289474L;
    sTime = timestampToTimez(timestamp);

    System.out.println(sTime);
    }
    }
  • 相关阅读:
    Can't remove netstandard folder from output path (.net standard)
    website项目的reference问题
    The type exists in both DLLs
    git常用配置
    Map dependencies with code maps
    How to check HTML version of any website
    Bootstrap UI 编辑器
    网上职位要求对照
    Use of implicitly declared global variable
    ResolveUrl in external JavaScript file in asp.net project
  • 原文地址:https://www.cnblogs.com/fuqiang-terry/p/11120443.html
Copyright © 2011-2022 走看看