一、环境
- Idea 2020.1
- JDK 1.8
- maven
二、目的
spring boot 通过整合influxdb
gitHub地址: https://github.com/ouyushan/ouyushan-spring-boot-samples
三、步骤
3.1、点击File -> New Project -> Spring Initializer,点击next
3.2、在对应地方修改自己的项目信息
3.3、选择Web依赖,选中Spring Web、Spring Boot Actuator。可以选择Spring Boot版本,本次默认为2.3.0,点击Next
3.4、项目结构
四、添加文件
pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.ouyushan</groupId>
<artifactId>ouyushan-spring-boot-samples</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<groupId>org.ouyushan</groupId>
<artifactId>spring-boot-data-influxdb</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot-data-influxdb</name>
<description>InfluxDB project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-influx</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>2.19</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.hibernate.validator</groupId>
<artifactId>hibernate-validator</artifactId>
<version>6.1.5.Final</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties文件
spring:
influx:
url: http://localhost:8086
user: root
password: ouyushan
database: test
retention_policy: default
retention_policy_time: 30d
management:
metrics:
export:
influx:
enabled: true
db: ouyushan
uri: http://localhost:8086
user: root
password: ouyushan
connect-timeout: 1s
read-timeout: 10s
auto-create-db: true
step: 1m
consistency: one
compressed: true
batch-size: 10000
InfluxDBProperties.java
package org.ouyushan.springboot.data.influxdb.config;
import lombok.Data;
import org.hibernate.validator.constraints.URL;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import javax.validation.constraints.NotBlank;
/**
* @Description:
* @Author: ouyushan
* @Email: ouyushan@hotmail.com
* @Date: 2020/6/3 16:00
*/
@Configuration
@ConfigurationProperties(prefix = "spring.influx")
@Data
public class InfluxDBProperties {
@URL
private String url;
@NotBlank
private String user;
@NotBlank
private String password;
@NotBlank
private String database;
@NotBlank
private String retentionPolicy;
@NotBlank
private String retentionPolicyTime;
}
InfluxDBConfig.java
package org.ouyushan.springboot.data.influxdb.config;
import lombok.extern.slf4j.Slf4j;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Description:
* @Author: ouyushan
* @Email: ouyushan@hotmail.com
* @Date: 2020/6/3 15:15
*/
@Configuration
@Slf4j
public class InfluxDBConfig {
@Autowired
private InfluxDBProperties influxDBProperties;
@Bean
public InfluxDBConnect getInfluxDBConnect() {
InfluxDBConnect influxDB = new InfluxDBConnect(influxDBProperties.getUser(), influxDBProperties.getPassword(),
influxDBProperties.getUrl(), influxDBProperties.getDatabase(), influxDBProperties.getRetentionPolicy(),
influxDBProperties.getRetentionPolicyTime());
influxDB.influxDbBuild();
influxDB.createRetentionPolicy();
log.info("init influxdb::[{}]", influxDBProperties);
return influxDB;
}
}
InfluxDBConnect.java
package org.ouyushan.springboot.data.influxdb.config;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* @Description:
* @Author: ouyushan
* @Email: ouyushan@hotmail.com
* @Date: 2020/6/3 16:09
*/
@Data
@Slf4j
public class InfluxDBConnect {
/**
* 用户名
*/
private String user;
private String password;
private String url;
public String database;
/**
* 数据保存策略
*/
private String retentionPolicy;
/**
* 数据保存策略中数据保存时间
*/
private String retentionPolicyTime;
/**
* InfluxDB实例
*/
private InfluxDB influxDB;
public InfluxDBConnect(String user, String password, String url, String database, String retentionPolicy, String retentionPolicyTime) {
this.user = user;
this.password = password;
this.url = url;
this.database = database;
// autogen默认的数据保存策略
this.retentionPolicy = retentionPolicy == null || "".equals(retentionPolicy) ? "autogen" : retentionPolicy;
this.retentionPolicyTime = retentionPolicyTime == null || "".equals(retentionPolicy) ? "30d" : retentionPolicyTime;
this.influxDB = influxDbBuild();
}
/**
* 连接时序数据库;获得InfluxDB
**/
public InfluxDB influxDbBuild() {
if (influxDB == null) {
influxDB = InfluxDBFactory.connect(url, user, password);
influxDB.query(new Query("CREATE DATABASE " + database));
influxDB.setDatabase(database);
}
return influxDB;
}
/**
* 设置数据保存策略 defalut 策略名 /database 数据库名/ 30d 数据保存时限30天/ 1 副本个数为1/ 结尾DEFAULT
* 表示 设为默认的策略
*/
public void createRetentionPolicy() {
String command = String.format("CREATE RETENTION POLICY "%s" ON "%s" DURATION %s REPLICATION %s DEFAULT",
retentionPolicy, database, retentionPolicyTime, 1);
this.query(command);
}
/**
* 查询
*
* @param command 查询语句
* @return
*/
public QueryResult query(String command) {
return influxDB.query(new Query(command, database));
}
/**
* 插入
*
* @param measurement 表
* @param tags 标签
* @param fields 字段
*/
public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields) {
Point.Builder builder = Point.measurement(measurement);
// 纳秒时会出现异常信息:partial write: points beyond retention policy dropped=1
// builder.time(System.nanoTime(), TimeUnit.NANOSECONDS);
builder.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
builder.tag(tags);
builder.fields(fields);
log.info("influxDB insert data:[{}]", builder.build().toString());
influxDB.write(database, "", builder.build());
}
}
五、测试
SpringBootDataInfluxdbApplicationTests.java
package org.ouyushan.springboot.data.influxdb;
import org.influxdb.dto.QueryResult;
import org.junit.jupiter.api.Test;
import org.ouyushan.spring.boot.data.influxdb.config.InfluxDBConnect;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
@SpringBootTest
class SpringBootDataInfluxdbApplicationTests {
@Resource
InfluxDBConnect influxDBConnect;
@Test
public void testInsert() {
Map<String, String> tagsMap = new HashMap<>();
Map<String, Object> fieldsMap = new HashMap<>();
System.out.println("influxDB start time :" + System.currentTimeMillis());
int i = 0;
for (; ; ) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
tagsMap.put("user_id", String.valueOf(i % 10));
tagsMap.put("url", "http://www.baidu.com");
tagsMap.put("service_method", "testInsert" + (i % 5));
fieldsMap.put("count", i % 5);
influxDBConnect.insert("usage", tagsMap, fieldsMap);
i++;
}
}
@Test
public void testQuery() {
QueryResult queryResult = influxDBConnect.query("select * from usage");
queryResult.getResults().stream().forEach(result -> System.out.println(result.getSeries()));
}
}