zoukankan      html  css  js  c++  java
  • sentinel控制台监控数据持久化【InfluxDB】

    根据官方wiki文档,sentinel控制台的实时监控数据,默认仅存储 5 分钟以内的数据。如需持久化,需要定制实现相关接口。

    https://github.com/alibaba/Sentinel/wiki/在生产环境中使用-Sentinel-控制台 也给出了指导步骤:

    1.自行扩展实现 MetricsRepository 接口;

    2.注册成 Spring Bean 并在相应位置通过 @Qualifier 注解指定对应的 bean name 即可。

    本文使用时序数据库InfluxDB来进行持久化,从下载开始,一步步编写一个基于InfluxDB的存储实现。

    -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    InfluxDB官网:https://www.influxdata.com

    关键词:

    高性能时序数据库

    go语言编写没有外部依赖

    支持HTTP API读写

    支持类SQL查询语法

    通过数据保留策略(Retention Policies)支持自动清理历史数据

    通过连续查询(Continuous Queries)支持数据归档

    最新版本:1.6.4

    下载

    windows:wget https://dl.influxdata.com/influxdb/releases/influxdb-1.6.4_windows_amd64.zip

    linux:wget https://dl.influxdata.com/influxdb/releases/influxdb-1.6.4_linux_amd64.tar.gz

    注:windows下载安装wget  https://eternallybored.org/misc/wget/

    在windows环境,解压zip文件至D:influxdbinfluxdb-1.6.4-1目录:

    打开cmd命令行窗口,在D:influxdbinfluxdb-1.6.4-1执行命令启动influxdb服务端:influxd

     再打开一个cmd窗口,在目录下输入influx启动客户端: // 后面可以带上参数:-precision rfc3339 指定时间格式显示

    show databases发现只有系统的2个数据库,这里我们新建一个sentinel_db,输入命令:create database sentinel_db

    use sentinel_db  使用sentinel_db数据库

    show measurements  查看数据库中的数据表(measurement)

    可以看到,这几个InfluxDB命令跟MySQL很相似。

    ==============================================================

    InfluxDB名词概念:

    database:数据库 // 关系数据库的database

    measurement:数据库中的表 // 关系数据库中的table

    point:表里的一行数据 // 关系数据库中的row

    point由3部分组成:

    time:每条数据记录的时间,也是数据库自动生成的主索引;// 类似主键

    fields:各种记录的值;// 没有索引的字段

    tags:各种有索引的属性 // 有索引的字段

    ==============================================================

    在官方github上,有一个java的客户端库:

    https://github.com/influxdata/influxdb-java

    在sentinel-dashboard的pom.xml中,加入maven依赖:

    <dependency>
        <groupId>org.influxdb</groupId>
        <artifactId>influxdb-java</artifactId>
        <version>2.14</version>
    </dependency>

    封装一个工具类:存储InfluxDB连接信息以及方便调用

    /**
     * @author cdfive
     * @date 2018-10-19
     */
    @Component
    public class InfluxDBUtils {
    
        private static Logger logger = LoggerFactory.getLogger(InfluxDBUtils.class);
    
        private static String url;
    
        private static String username;
    
        private static String password;
    
        private static InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
    
        @Value("${influxdb.url}")
        public void setUrl(String url) {
            InfluxDBUtils.url = url;
        }
    
        @Value("${influxdb.username}")
        public void setUsername(String username) {
            InfluxDBUtils.username = username;
        }
    
        @Value("${influxdb.password}")
        public void setPassword(String password) {
            InfluxDBUtils.password = password;
        }
    
        public static void init(String url, String username, String password) {
            InfluxDBUtils.url = url;
            InfluxDBUtils.username = username;
            InfluxDBUtils.password = password;
        }
    
        public static <T> T process(String database, InfluxDBCallback callback) {
            InfluxDB influxDB = null;
            T t = null;
            try {
                influxDB = InfluxDBFactory.connect(url, username, password);
                influxDB.setDatabase(database);
    
                t = callback.doCallBack(database, influxDB);
            } catch (Exception e) {
                logger.error("[process exception]", e);
            } finally {
                if (influxDB != null) {
                    try {
                        influxDB.close();
                    } catch (Exception e) {
                        logger.error("[influxDB.close exception]", e);
                    }
                }
            }
    
            return t;
        }
    
        public static void insert(String database, InfluxDBInsertCallback influxDBInsertCallback) {
            process(database, new InfluxDBCallback() {
                @Override
                public <T> T doCallBack(String database, InfluxDB influxDB) {
                    influxDBInsertCallback.doCallBack(database, influxDB);
                    return null;
                }
            });
    
        }
    
        public static QueryResult query(String database, InfluxDBQueryCallback influxDBQueryCallback) {
            return process(database, new InfluxDBCallback() {
                @Override
                public <T> T doCallBack(String database, InfluxDB influxDB) {
                    QueryResult queryResult = influxDBQueryCallback.doCallBack(database, influxDB);
                    return (T) queryResult;
                }
            });
        }
    
        public static <T> List<T> queryList(String database, String sql, Map<String, Object> paramMap, Class<T> clasz) {
            QueryResult queryResult = query(database, new InfluxDBQueryCallback() {
                @Override
                public QueryResult doCallBack(String database, InfluxDB influxDB) {
                    BoundParameterQuery.QueryBuilder queryBuilder = BoundParameterQuery.QueryBuilder.newQuery(sql);
                    queryBuilder.forDatabase(database);
    
                    if (paramMap != null && paramMap.size() > 0) {
                        Set<Map.Entry<String, Object>> entries = paramMap.entrySet();
                        for (Map.Entry<String, Object> entry : entries) {
                            queryBuilder.bind(entry.getKey(), entry.getValue());
                        }
                    }
    
                    return influxDB.query(queryBuilder.create());
                }
            });
    
            return resultMapper.toPOJO(queryResult, clasz);
        }
    
        public interface InfluxDBCallback {
            <T> T doCallBack(String database, InfluxDB influxDB);
        }
    
        public interface InfluxDBInsertCallback {
            void doCallBack(String database, InfluxDB influxDB);
        }
    
        public interface InfluxDBQueryCallback {
            QueryResult doCallBack(String database, InfluxDB influxDB);
        }
    }

    其中:

    url、username、password用于存储InfluxDB的连接、用户名、密码信息,定义为static属性,因此在set方法上使用@Value注解从配置文件读取属性值;

    resultMapper用于查询结果到实体类的映射;

    init方法用于初始化url、username、password;

    process为通用的处理方法,负责打开关闭连接,并且调用InfluxDBCallback回调方法;

    insert为插入数据方法,配合InfluxDBInsertCallback回调使用;

    query为通用的查询方法,配合InfluxDBQueryCallback回调方法使用,返回QueryResult对象;

    queryList为查询列表方法,调用query得到QueryResult,再通过resultMapper转换为List<实体类>;

    在resources目录下的application.properties文件中,增加InfluxDB的配置: 

    influxdb.url=${influxdb.url}
    influxdb.username=${influxdb.username}
    influxdb.password=${influxdb.password}

    用${xxx}占位符,这样可以通过maven的pom.xml添加profile配置不同环境(开发、测试、生产) 或 从配置中心读取参数。

    在datasource.entity包下,新建influxdb包,下面新建sentinel_metric数据表(measurement)对应的实体类MetricPO:

    package com.taobao.csp.sentinel.dashboard.datasource.entity.influxdb;
    
    import org.influxdb.annotation.Column;
    import org.influxdb.annotation.Measurement;
    
    import java.time.Instant;
    
    /**
     * @author cdfive
     * @date 2018-10-19
     */
    @Measurement(name = "sentinel_metric")
    public class MetricPO {
    
        @Column(name = "time")
        private Instant time;
    
        @Column(name = "id")
        private Long id;
    
        @Column(name = "gmtCreate")
        private Long gmtCreate;
    
        @Column(name = "gmtModified")
        private Long gmtModified;
    
        @Column(name = "app", tag = true)
        private String app;
    
        @Column(name = "resource", tag = true)
        private String resource;
    
        @Column(name = "passQps")
        private Long passQps;
    
        @Column(name = "successQps")
        private Long successQps;
    
        @Column(name = "blockQps")
        private Long blockQps;
    
        @Column(name = "exceptionQps")
        private Long exceptionQps;
    
        @Column(name = "rt")
        private double rt;
    
        @Column(name = "count")
        private int count;
    
        @Column(name = "resourceCode")
        private int resourceCode;
    
        // getter setter省略
    }

    该类参考MetricEntity创建,加上influxdb-java包提供的注解,通过@Measurement(name = "sentinel_metric")指定数据表(measurement)名称,

    time作为时序数据库的时间列;

    app、resource设置为tag列,通过注解标识为tag=true;

    其它字段为filed列;

    接着在InMemoryMetricsRepository所在的repository.metric包下新建InfluxDBMetricsRepository类,实现MetricsRepository<MetricEntity>接口:

    package com.taobao.csp.sentinel.dashboard.repository.metric;
    
    import com.alibaba.csp.sentinel.util.StringUtil;
    import com.taobao.csp.sentinel.dashboard.datasource.entity.MetricEntity;
    import com.taobao.csp.sentinel.dashboard.datasource.entity.influxdb.MetricPO;
    import com.taobao.csp.sentinel.dashboard.util.InfluxDBUtils;
    import org.apache.commons.lang.time.DateFormatUtils;
    import org.apache.commons.lang.time.DateUtils;
    import org.influxdb.InfluxDB;
    import org.influxdb.dto.Point;
    import org.springframework.stereotype.Repository;
    import org.springframework.util.CollectionUtils;
    
    import java.util.*;
    import java.util.concurrent.TimeUnit;
    import java.util.stream.Collectors;
    
    /**
     * metrics数据InfluxDB存储实现
     * @author cdfive
     * @date 2018-10-19
     */
    @Repository("influxDBMetricsRepository")
    public class InfluxDBMetricsRepository implements MetricsRepository<MetricEntity> {
    
        /**时间格式*/
        private static final String DATE_FORMAT_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS";
    
        /**数据库名称*/
        private static final String SENTINEL_DATABASE = "sentinel_db";
    
        /**数据表名称*/
        private static final String METRIC_MEASUREMENT = "sentinel_metric";
    
        /**北京时间领先UTC时间8小时 UTC: Universal Time Coordinated,世界统一时间*/
        private static final Integer UTC_8 = 8;
    
        @Override
        public void save(MetricEntity metric) {
            if (metric == null || StringUtil.isBlank(metric.getApp())) {
                return;
            }
    
            InfluxDBUtils.insert(SENTINEL_DATABASE, new InfluxDBUtils.InfluxDBInsertCallback() {
                @Override
                public void doCallBack(String database, InfluxDB influxDB) {
                    if (metric.getId() == null) {
                        metric.setId(System.currentTimeMillis());
                    }
                    doSave(influxDB, metric);
                }
            });
        }
    
        @Override
        public void saveAll(Iterable<MetricEntity> metrics) {
            if (metrics == null) {
                return;
            }
    
            Iterator<MetricEntity> iterator = metrics.iterator();
            boolean next = iterator.hasNext();
            if (!next) {
                return;
            }
    
            InfluxDBUtils.insert(SENTINEL_DATABASE, new InfluxDBUtils.InfluxDBInsertCallback() {
                @Override
                public void doCallBack(String database, InfluxDB influxDB) {
                    while (iterator.hasNext()) {
                        MetricEntity metric = iterator.next();
                        if (metric.getId() == null) {
                            metric.setId(System.currentTimeMillis());
                        }
                        doSave(influxDB, metric);
                    }
                }
            });
        }
    
        @Override
        public List<MetricEntity> queryByAppAndResourceBetween(String app, String resource, long startTime, long endTime) {
            List<MetricEntity> results = new ArrayList<MetricEntity>();
            if (StringUtil.isBlank(app)) {
                return results;
            }
    
            if (StringUtil.isBlank(resource)) {
                return results;
            }
    
            StringBuilder sql = new StringBuilder();
            sql.append("SELECT * FROM " + METRIC_MEASUREMENT);
            sql.append(" WHERE app=$app");
            sql.append(" AND resource=$resource");
            sql.append(" AND time>=$startTime");
            sql.append(" AND time<=$endTime");
    
            Map<String, Object> paramMap = new HashMap<String, Object>();
            paramMap.put("app", app);
            paramMap.put("resource", resource);
            paramMap.put("startTime", DateFormatUtils.format(new Date(startTime), DATE_FORMAT_PATTERN));
            paramMap.put("endTime", DateFormatUtils.format(new Date(endTime), DATE_FORMAT_PATTERN));
    
            List<MetricPO> metricPOS = InfluxDBUtils.queryList(SENTINEL_DATABASE, sql.toString(), paramMap, MetricPO.class);
    
            if (CollectionUtils.isEmpty(metricPOS)) {
                return results;
            }
    
            for (MetricPO metricPO : metricPOS) {
                results.add(convertToMetricEntity(metricPO));
            }
    
            return results;
        }
    
        @Override
        public List<String> listResourcesOfApp(String app) {
            List<String> results = new ArrayList<>();
            if (StringUtil.isBlank(app)) {
                return results;
            }
    
            StringBuilder sql = new StringBuilder();
            sql.append("SELECT * FROM " + METRIC_MEASUREMENT);
            sql.append(" WHERE app=$app");
            sql.append(" AND time>=$startTime");
    
            Map<String, Object> paramMap = new HashMap<String, Object>();
            long startTime = System.currentTimeMillis() - 1000 * 60;
            paramMap.put("app", app);
            paramMap.put("startTime", DateFormatUtils.format(new Date(startTime), DATE_FORMAT_PATTERN));
    
            List<MetricPO> metricPOS = InfluxDBUtils.queryList(SENTINEL_DATABASE, sql.toString(), paramMap, MetricPO.class);
    
            if (CollectionUtils.isEmpty(metricPOS)) {
                return results;
            }
    
            List<MetricEntity> metricEntities = new ArrayList<MetricEntity>();
            for (MetricPO metricPO : metricPOS) {
                metricEntities.add(convertToMetricEntity(metricPO));
            }
    
            Map<String, MetricEntity> resourceCount = new HashMap<>(32);
    
            for (MetricEntity metricEntity : metricEntities) {
                String resource = metricEntity.getResource();
                if (resourceCount.containsKey(resource)) {
                    MetricEntity oldEntity = resourceCount.get(resource);
                    oldEntity.addPassQps(metricEntity.getPassQps());
                    oldEntity.addRtAndSuccessQps(metricEntity.getRt(), metricEntity.getSuccessQps());
                    oldEntity.addBlockQps(metricEntity.getBlockQps());
                    oldEntity.addExceptionQps(metricEntity.getExceptionQps());
                    oldEntity.addCount(1);
                } else {
                    resourceCount.put(resource, MetricEntity.copyOf(metricEntity));
                }
            }
    
            // Order by last minute b_qps DESC.
            return resourceCount.entrySet()
                    .stream()
                    .sorted((o1, o2) -> {
                        MetricEntity e1 = o1.getValue();
                        MetricEntity e2 = o2.getValue();
                        int t = e2.getBlockQps().compareTo(e1.getBlockQps());
                        if (t != 0) {
                            return t;
                        }
                        return e2.getPassQps().compareTo(e1.getPassQps());
                    })
                    .map(Map.Entry::getKey)
                    .collect(Collectors.toList());
        }
    
        private MetricEntity convertToMetricEntity(MetricPO metricPO) {
            MetricEntity metricEntity = new MetricEntity();
    
            metricEntity.setId(metricPO.getId());
            metricEntity.setGmtCreate(new Date(metricPO.getGmtCreate()));
            metricEntity.setGmtModified(new Date(metricPO.getGmtModified()));
            metricEntity.setApp(metricPO.getApp());
            metricEntity.setTimestamp(Date.from(metricPO.getTime().minusMillis(TimeUnit.HOURS.toMillis(UTC_8))));// 查询数据减8小时
            metricEntity.setResource(metricPO.getResource());
            metricEntity.setPassQps(metricPO.getPassQps());
            metricEntity.setSuccessQps(metricPO.getSuccessQps());
            metricEntity.setBlockQps(metricPO.getBlockQps());
            metricEntity.setExceptionQps(metricPO.getExceptionQps());
            metricEntity.setRt(metricPO.getRt());
            metricEntity.setCount(metricPO.getCount());
    
            return metricEntity;
        }
    
        private void doSave(InfluxDB influxDB, MetricEntity metric) {
            influxDB.write(Point.measurement(METRIC_MEASUREMENT)
                    .time(DateUtils.addHours(metric.getTimestamp(), UTC_8).getTime(), TimeUnit.MILLISECONDS)// 因InfluxDB默认UTC时间,按北京时间算写入数据加8小时
                    .tag("app", metric.getApp())
                    .tag("resource", metric.getResource())
                    .addField("id", metric.getId())
                    .addField("gmtCreate", metric.getGmtCreate().getTime())
                    .addField("gmtModified", metric.getGmtModified().getTime())
                    .addField("passQps", metric.getPassQps())
                    .addField("successQps", metric.getSuccessQps())
                    .addField("blockQps", metric.getBlockQps())
                    .addField("exceptionQps", metric.getExceptionQps())
                    .addField("rt", metric.getRt())
                    .addField("count", metric.getCount())
                    .addField("resourceCode", metric.getResourceCode())
                    .build());
        }
    }

    其中:

    save、saveAll方法通过调用InfluxDBUtils.insert和InfluxDBInsertCallback回调方法,往sentinel_db库的sentinel_metric数据表写数据;

    saveAll方法不是循环调用save方法,而是在回调内部循环Iterable<MetricEntity> metrics处理,这样InfluxDBFactory.connect连接只打开关闭一次;

    doSave方法中,.time(DateUtils.addHours(metric.getTimestamp(), 8).getTime(), TimeUnit.MILLISECONDS)

    因InfluxDB的UTC时间暂时没找到修改方法,所以这里time时间列加了8个小时时差;

    queryByAppAndResourceBetween、listResourcesOfApp里面的查询方法,使用InfluxDB提供的类sql语法,编写查询语句即可。

    最后一步,在MetricController、MetricFetcher两个类,找到metricStore属性,在@Autowired注解上面加上@Qualifier("jpaMetricsRepository")注解:

    @Qualifier("influxDBMetricsRepository")
    @Autowired
    private MetricsRepository<MetricEntity> metricStore;

    来验证下成果:

    设置sentinel-dashboard工程启动参数:-Dserver.port=8080 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dproject.name=sentinel-dashboard

    启动工程,打开http://localhost:8080,查看各页面均显示正常,

    在命令行通过InfluxDB客户端命令,show measurements,可以看到已经生成了sentinel_metric数据表(measurement);

    查询总数:select count(id) from sentinel_metric

    查询最新5行数据:select * from sentinel_metric order by time desc limit 5

    注:命令行语句结束不用加分号

    -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    代码参考:https://github.com/cdfive/Sentinel/tree/winxuan_develop/sentinel-dashboard

    扩展:

    1.考虑以什么时间维度归档历史数据;

    2.结合grafana将监控数据进行多维度的统计和呈现。

    -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    参考:

    Sentinel官方文档:

    https://github.com/alibaba/Sentinel/wiki/控制台

    https://github.com/alibaba/Sentinel/wiki/在生产环境中使用-Sentinel-控制台

    InfluxDB官网文档 https://docs.influxdata.com/influxdb/v1.6/introduction/getting-started/

    InfluxDB简明手册 https://xtutu.gitbooks.io/influxdb-handbook/content/

  • 相关阅读:
    Spring Boot (20) 拦截器
    Spring Boot (19) servlet、filter、listener
    Spring Boot (18) @Async异步
    Spring Boot (17) 发送邮件
    Spring Boot (16) logback和access日志
    Spring Boot (15) pom.xml设置
    Spring Boot (14) 数据源配置原理
    Spring Boot (13) druid监控
    Spring boot (12) tomcat jdbc连接池
    Spring Boot (11) mybatis 关联映射
  • 原文地址:https://www.cnblogs.com/cdfive2018/p/9914838.html
Copyright © 2011-2022 走看看