zoukankan      html  css  js  c++  java
  • sentinel控制台监控数据持久化【InfluxDB】

    根据官方wiki文档,sentinel控制台的实时监控数据,默认仅存储 5 分钟以内的数据。如需持久化,需要定制实现相关接口。

    https://github.com/alibaba/Sentinel/wiki/在生产环境中使用-Sentinel-控制台 也给出了指导步骤:

    1.自行扩展实现 MetricsRepository 接口;

    2.注册成 Spring Bean 并在相应位置通过 @Qualifier 注解指定对应的 bean name 即可。

    本文使用时序数据库InfluxDB来进行持久化,从下载开始,一步步编写一个基于InfluxDB的存储实现。

    -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    InfluxDB官网:https://www.influxdata.com

    关键词:

    高性能时序数据库

    go语言编写没有外部依赖

    支持HTTP API读写

    支持类SQL查询语法

    通过数据保留策略(Retention Policies)支持自动清理历史数据

    通过连续查询(Continuous Queries)支持数据归档

    最新版本:1.6.4

    下载

    windows:wget https://dl.influxdata.com/influxdb/releases/influxdb-1.6.4_windows_amd64.zip

    linux:wget https://dl.influxdata.com/influxdb/releases/influxdb-1.6.4_linux_amd64.tar.gz

    注:windows下载安装wget  https://eternallybored.org/misc/wget/

    在windows环境,解压zip文件至D:influxdbinfluxdb-1.6.4-1目录:

    打开cmd命令行窗口,在D:influxdbinfluxdb-1.6.4-1执行命令启动influxdb服务端:influxd

     再打开一个cmd窗口,在目录下输入influx启动客户端: // 后面可以带上参数:-precision rfc3339 指定时间格式显示

    show databases发现只有系统的2个数据库,这里我们新建一个sentinel_db,输入命令:create database sentinel_db

    use sentinel_db  使用sentinel_db数据库

    show measurements  查看数据库中的数据表(measurement)

    可以看到,这几个InfluxDB命令跟MySQL很相似。

    ==============================================================

    InfluxDB名词概念:

    database:数据库 // 关系数据库的database

    measurement:数据库中的表 // 关系数据库中的table

    point:表里的一行数据 // 关系数据库中的row

    point由3部分组成:

    time:每条数据记录的时间,也是数据库自动生成的主索引;// 类似主键

    fields:各种记录的值;// 没有索引的字段

    tags:各种有索引的属性 // 有索引的字段

    ==============================================================

    在官方github上,有一个java的客户端库:

    https://github.com/influxdata/influxdb-java

    在sentinel-dashboard的pom.xml中,加入maven依赖:

    <dependency>
        <groupId>org.influxdb</groupId>
        <artifactId>influxdb-java</artifactId>
        <version>2.14</version>
    </dependency>

    封装一个工具类:存储InfluxDB连接信息以及方便调用

    /**
     * @author cdfive
     * @date 2018-10-19
     */
    @Component
    public class InfluxDBUtils {
    
        private static Logger logger = LoggerFactory.getLogger(InfluxDBUtils.class);
    
        private static String url;
    
        private static String username;
    
        private static String password;
    
        private static InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
    
        @Value("${influxdb.url}")
        public void setUrl(String url) {
            InfluxDBUtils.url = url;
        }
    
        @Value("${influxdb.username}")
        public void setUsername(String username) {
            InfluxDBUtils.username = username;
        }
    
        @Value("${influxdb.password}")
        public void setPassword(String password) {
            InfluxDBUtils.password = password;
        }
    
        public static void init(String url, String username, String password) {
            InfluxDBUtils.url = url;
            InfluxDBUtils.username = username;
            InfluxDBUtils.password = password;
        }
    
        public static <T> T process(String database, InfluxDBCallback callback) {
            InfluxDB influxDB = null;
            T t = null;
            try {
                influxDB = InfluxDBFactory.connect(url, username, password);
                influxDB.setDatabase(database);
    
                t = callback.doCallBack(database, influxDB);
            } catch (Exception e) {
                logger.error("[process exception]", e);
            } finally {
                if (influxDB != null) {
                    try {
                        influxDB.close();
                    } catch (Exception e) {
                        logger.error("[influxDB.close exception]", e);
                    }
                }
            }
    
            return t;
        }
    
        public static void insert(String database, InfluxDBInsertCallback influxDBInsertCallback) {
            process(database, new InfluxDBCallback() {
                @Override
                public <T> T doCallBack(String database, InfluxDB influxDB) {
                    influxDBInsertCallback.doCallBack(database, influxDB);
                    return null;
                }
            });
    
        }
    
        public static QueryResult query(String database, InfluxDBQueryCallback influxDBQueryCallback) {
            return process(database, new InfluxDBCallback() {
                @Override
                public <T> T doCallBack(String database, InfluxDB influxDB) {
                    QueryResult queryResult = influxDBQueryCallback.doCallBack(database, influxDB);
                    return (T) queryResult;
                }
            });
        }
    
        public static <T> List<T> queryList(String database, String sql, Map<String, Object> paramMap, Class<T> clasz) {
            QueryResult queryResult = query(database, new InfluxDBQueryCallback() {
                @Override
                public QueryResult doCallBack(String database, InfluxDB influxDB) {
                    BoundParameterQuery.QueryBuilder queryBuilder = BoundParameterQuery.QueryBuilder.newQuery(sql);
                    queryBuilder.forDatabase(database);
    
                    if (paramMap != null && paramMap.size() > 0) {
                        Set<Map.Entry<String, Object>> entries = paramMap.entrySet();
                        for (Map.Entry<String, Object> entry : entries) {
                            queryBuilder.bind(entry.getKey(), entry.getValue());
                        }
                    }
    
                    return influxDB.query(queryBuilder.create());
                }
            });
    
            return resultMapper.toPOJO(queryResult, clasz);
        }
    
        public interface InfluxDBCallback {
            <T> T doCallBack(String database, InfluxDB influxDB);
        }
    
        public interface InfluxDBInsertCallback {
            void doCallBack(String database, InfluxDB influxDB);
        }
    
        public interface InfluxDBQueryCallback {
            QueryResult doCallBack(String database, InfluxDB influxDB);
        }
    }

    其中:

    url、username、password用于存储InfluxDB的连接、用户名、密码信息,定义为static属性,因此在set方法上使用@Value注解从配置文件读取属性值;

    resultMapper用于查询结果到实体类的映射;

    init方法用于初始化url、username、password;

    process为通用的处理方法,负责打开关闭连接,并且调用InfluxDBCallback回调方法;

    insert为插入数据方法,配合InfluxDBInsertCallback回调使用;

    query为通用的查询方法,配合InfluxDBQueryCallback回调方法使用,返回QueryResult对象;

    queryList为查询列表方法,调用query得到QueryResult,再通过resultMapper转换为List<实体类>;

    在resources目录下的application.properties文件中,增加InfluxDB的配置: 

    influxdb.url=${influxdb.url}
    influxdb.username=${influxdb.username}
    influxdb.password=${influxdb.password}

    用${xxx}占位符,这样可以通过maven的pom.xml添加profile配置不同环境(开发、测试、生产) 或 从配置中心读取参数。

    在datasource.entity包下,新建influxdb包,下面新建sentinel_metric数据表(measurement)对应的实体类MetricPO:

    package com.taobao.csp.sentinel.dashboard.datasource.entity.influxdb;
    
    import org.influxdb.annotation.Column;
    import org.influxdb.annotation.Measurement;
    
    import java.time.Instant;
    
    /**
     * @author cdfive
     * @date 2018-10-19
     */
    @Measurement(name = "sentinel_metric")
    public class MetricPO {
    
        @Column(name = "time")
        private Instant time;
    
        @Column(name = "id")
        private Long id;
    
        @Column(name = "gmtCreate")
        private Long gmtCreate;
    
        @Column(name = "gmtModified")
        private Long gmtModified;
    
        @Column(name = "app", tag = true)
        private String app;
    
        @Column(name = "resource", tag = true)
        private String resource;
    
        @Column(name = "passQps")
        private Long passQps;
    
        @Column(name = "successQps")
        private Long successQps;
    
        @Column(name = "blockQps")
        private Long blockQps;
    
        @Column(name = "exceptionQps")
        private Long exceptionQps;
    
        @Column(name = "rt")
        private double rt;
    
        @Column(name = "count")
        private int count;
    
        @Column(name = "resourceCode")
        private int resourceCode;
    
        // getter setter省略
    }

    该类参考MetricEntity创建,加上influxdb-java包提供的注解,通过@Measurement(name = "sentinel_metric")指定数据表(measurement)名称,

    time作为时序数据库的时间列;

    app、resource设置为tag列,通过注解标识为tag=true;

    其它字段为filed列;

    接着在InMemoryMetricsRepository所在的repository.metric包下新建InfluxDBMetricsRepository类,实现MetricsRepository<MetricEntity>接口:

    package com.taobao.csp.sentinel.dashboard.repository.metric;
    
    import com.alibaba.csp.sentinel.util.StringUtil;
    import com.taobao.csp.sentinel.dashboard.datasource.entity.MetricEntity;
    import com.taobao.csp.sentinel.dashboard.datasource.entity.influxdb.MetricPO;
    import com.taobao.csp.sentinel.dashboard.util.InfluxDBUtils;
    import org.apache.commons.lang.time.DateFormatUtils;
    import org.apache.commons.lang.time.DateUtils;
    import org.influxdb.InfluxDB;
    import org.influxdb.dto.Point;
    import org.springframework.stereotype.Repository;
    import org.springframework.util.CollectionUtils;
    
    import java.util.*;
    import java.util.concurrent.TimeUnit;
    import java.util.stream.Collectors;
    
    /**
     * metrics数据InfluxDB存储实现
     * @author cdfive
     * @date 2018-10-19
     */
    @Repository("influxDBMetricsRepository")
    public class InfluxDBMetricsRepository implements MetricsRepository<MetricEntity> {
    
        /**时间格式*/
        private static final String DATE_FORMAT_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS";
    
        /**数据库名称*/
        private static final String SENTINEL_DATABASE = "sentinel_db";
    
        /**数据表名称*/
        private static final String METRIC_MEASUREMENT = "sentinel_metric";
    
        /**北京时间领先UTC时间8小时 UTC: Universal Time Coordinated,世界统一时间*/
        private static final Integer UTC_8 = 8;
    
        @Override
        public void save(MetricEntity metric) {
            if (metric == null || StringUtil.isBlank(metric.getApp())) {
                return;
            }
    
            InfluxDBUtils.insert(SENTINEL_DATABASE, new InfluxDBUtils.InfluxDBInsertCallback() {
                @Override
                public void doCallBack(String database, InfluxDB influxDB) {
                    if (metric.getId() == null) {
                        metric.setId(System.currentTimeMillis());
                    }
                    doSave(influxDB, metric);
                }
            });
        }
    
        @Override
        public void saveAll(Iterable<MetricEntity> metrics) {
            if (metrics == null) {
                return;
            }
    
            Iterator<MetricEntity> iterator = metrics.iterator();
            boolean next = iterator.hasNext();
            if (!next) {
                return;
            }
    
            InfluxDBUtils.insert(SENTINEL_DATABASE, new InfluxDBUtils.InfluxDBInsertCallback() {
                @Override
                public void doCallBack(String database, InfluxDB influxDB) {
                    while (iterator.hasNext()) {
                        MetricEntity metric = iterator.next();
                        if (metric.getId() == null) {
                            metric.setId(System.currentTimeMillis());
                        }
                        doSave(influxDB, metric);
                    }
                }
            });
        }
    
        @Override
        public List<MetricEntity> queryByAppAndResourceBetween(String app, String resource, long startTime, long endTime) {
            List<MetricEntity> results = new ArrayList<MetricEntity>();
            if (StringUtil.isBlank(app)) {
                return results;
            }
    
            if (StringUtil.isBlank(resource)) {
                return results;
            }
    
            StringBuilder sql = new StringBuilder();
            sql.append("SELECT * FROM " + METRIC_MEASUREMENT);
            sql.append(" WHERE app=$app");
            sql.append(" AND resource=$resource");
            sql.append(" AND time>=$startTime");
            sql.append(" AND time<=$endTime");
    
            Map<String, Object> paramMap = new HashMap<String, Object>();
            paramMap.put("app", app);
            paramMap.put("resource", resource);
            paramMap.put("startTime", DateFormatUtils.format(new Date(startTime), DATE_FORMAT_PATTERN));
            paramMap.put("endTime", DateFormatUtils.format(new Date(endTime), DATE_FORMAT_PATTERN));
    
            List<MetricPO> metricPOS = InfluxDBUtils.queryList(SENTINEL_DATABASE, sql.toString(), paramMap, MetricPO.class);
    
            if (CollectionUtils.isEmpty(metricPOS)) {
                return results;
            }
    
            for (MetricPO metricPO : metricPOS) {
                results.add(convertToMetricEntity(metricPO));
            }
    
            return results;
        }
    
        @Override
        public List<String> listResourcesOfApp(String app) {
            List<String> results = new ArrayList<>();
            if (StringUtil.isBlank(app)) {
                return results;
            }
    
            StringBuilder sql = new StringBuilder();
            sql.append("SELECT * FROM " + METRIC_MEASUREMENT);
            sql.append(" WHERE app=$app");
            sql.append(" AND time>=$startTime");
    
            Map<String, Object> paramMap = new HashMap<String, Object>();
            long startTime = System.currentTimeMillis() - 1000 * 60;
            paramMap.put("app", app);
            paramMap.put("startTime", DateFormatUtils.format(new Date(startTime), DATE_FORMAT_PATTERN));
    
            List<MetricPO> metricPOS = InfluxDBUtils.queryList(SENTINEL_DATABASE, sql.toString(), paramMap, MetricPO.class);
    
            if (CollectionUtils.isEmpty(metricPOS)) {
                return results;
            }
    
            List<MetricEntity> metricEntities = new ArrayList<MetricEntity>();
            for (MetricPO metricPO : metricPOS) {
                metricEntities.add(convertToMetricEntity(metricPO));
            }
    
            Map<String, MetricEntity> resourceCount = new HashMap<>(32);
    
            for (MetricEntity metricEntity : metricEntities) {
                String resource = metricEntity.getResource();
                if (resourceCount.containsKey(resource)) {
                    MetricEntity oldEntity = resourceCount.get(resource);
                    oldEntity.addPassQps(metricEntity.getPassQps());
                    oldEntity.addRtAndSuccessQps(metricEntity.getRt(), metricEntity.getSuccessQps());
                    oldEntity.addBlockQps(metricEntity.getBlockQps());
                    oldEntity.addExceptionQps(metricEntity.getExceptionQps());
                    oldEntity.addCount(1);
                } else {
                    resourceCount.put(resource, MetricEntity.copyOf(metricEntity));
                }
            }
    
            // Order by last minute b_qps DESC.
            return resourceCount.entrySet()
                    .stream()
                    .sorted((o1, o2) -> {
                        MetricEntity e1 = o1.getValue();
                        MetricEntity e2 = o2.getValue();
                        int t = e2.getBlockQps().compareTo(e1.getBlockQps());
                        if (t != 0) {
                            return t;
                        }
                        return e2.getPassQps().compareTo(e1.getPassQps());
                    })
                    .map(Map.Entry::getKey)
                    .collect(Collectors.toList());
        }
    
        private MetricEntity convertToMetricEntity(MetricPO metricPO) {
            MetricEntity metricEntity = new MetricEntity();
    
            metricEntity.setId(metricPO.getId());
            metricEntity.setGmtCreate(new Date(metricPO.getGmtCreate()));
            metricEntity.setGmtModified(new Date(metricPO.getGmtModified()));
            metricEntity.setApp(metricPO.getApp());
            metricEntity.setTimestamp(Date.from(metricPO.getTime().minusMillis(TimeUnit.HOURS.toMillis(UTC_8))));// 查询数据减8小时
            metricEntity.setResource(metricPO.getResource());
            metricEntity.setPassQps(metricPO.getPassQps());
            metricEntity.setSuccessQps(metricPO.getSuccessQps());
            metricEntity.setBlockQps(metricPO.getBlockQps());
            metricEntity.setExceptionQps(metricPO.getExceptionQps());
            metricEntity.setRt(metricPO.getRt());
            metricEntity.setCount(metricPO.getCount());
    
            return metricEntity;
        }
    
        private void doSave(InfluxDB influxDB, MetricEntity metric) {
            influxDB.write(Point.measurement(METRIC_MEASUREMENT)
                    .time(DateUtils.addHours(metric.getTimestamp(), UTC_8).getTime(), TimeUnit.MILLISECONDS)// 因InfluxDB默认UTC时间,按北京时间算写入数据加8小时
                    .tag("app", metric.getApp())
                    .tag("resource", metric.getResource())
                    .addField("id", metric.getId())
                    .addField("gmtCreate", metric.getGmtCreate().getTime())
                    .addField("gmtModified", metric.getGmtModified().getTime())
                    .addField("passQps", metric.getPassQps())
                    .addField("successQps", metric.getSuccessQps())
                    .addField("blockQps", metric.getBlockQps())
                    .addField("exceptionQps", metric.getExceptionQps())
                    .addField("rt", metric.getRt())
                    .addField("count", metric.getCount())
                    .addField("resourceCode", metric.getResourceCode())
                    .build());
        }
    }

    其中:

    save、saveAll方法通过调用InfluxDBUtils.insert和InfluxDBInsertCallback回调方法,往sentinel_db库的sentinel_metric数据表写数据;

    saveAll方法不是循环调用save方法,而是在回调内部循环Iterable<MetricEntity> metrics处理,这样InfluxDBFactory.connect连接只打开关闭一次;

    doSave方法中,.time(DateUtils.addHours(metric.getTimestamp(), 8).getTime(), TimeUnit.MILLISECONDS)

    因InfluxDB的UTC时间暂时没找到修改方法,所以这里time时间列加了8个小时时差;

    queryByAppAndResourceBetween、listResourcesOfApp里面的查询方法,使用InfluxDB提供的类sql语法,编写查询语句即可。

    最后一步,在MetricController、MetricFetcher两个类,找到metricStore属性,在@Autowired注解上面加上@Qualifier("jpaMetricsRepository")注解:

    @Qualifier("influxDBMetricsRepository")
    @Autowired
    private MetricsRepository<MetricEntity> metricStore;

    来验证下成果:

    设置sentinel-dashboard工程启动参数:-Dserver.port=8080 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dproject.name=sentinel-dashboard

    启动工程,打开http://localhost:8080,查看各页面均显示正常,

    在命令行通过InfluxDB客户端命令,show measurements,可以看到已经生成了sentinel_metric数据表(measurement);

    查询总数:select count(id) from sentinel_metric

    查询最新5行数据:select * from sentinel_metric order by time desc limit 5

    注:命令行语句结束不用加分号

    -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    代码参考:https://github.com/cdfive/Sentinel/tree/winxuan_develop/sentinel-dashboard

    扩展:

    1.考虑以什么时间维度归档历史数据;

    2.结合grafana将监控数据进行多维度的统计和呈现。

    -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    参考:

    Sentinel官方文档:

    https://github.com/alibaba/Sentinel/wiki/控制台

    https://github.com/alibaba/Sentinel/wiki/在生产环境中使用-Sentinel-控制台

    InfluxDB官网文档 https://docs.influxdata.com/influxdb/v1.6/introduction/getting-started/

    InfluxDB简明手册 https://xtutu.gitbooks.io/influxdb-handbook/content/

  • 相关阅读:
    xls10-Python3安装cx_Oracle连接oracle数据库实操总结list
    xls3-2-想要使用Python的xlwt设置单元格的背景色
    XLS9-PyCharm下打包*.py程序成.exe
    XLS8-python3+PyQt5+pycharm桌面GUI开发
    epoll模型中LT、ET模式分析
    lambda函数也叫匿名函数,即,函数没有具体的名称。先来看一个最简单例子:
    xls7-python读conf配置文件--ConfigParser
    xls6-python解析properties文件
    xls5-解析properties文件,在python中基本没有遇到
    xls2- 用Python读写Excel文件-乘法口诀
  • 原文地址:https://www.cnblogs.com/cdfive2018/p/9914838.html
Copyright © 2011-2022 走看看