zoukankan      html  css  js  c++  java
  • prometheus远程读接口实现对接clickhouse

    实现prom2click的类似功能,使用java实现。

    先调查prometheus查询请求参数等。

    http://49.4.48.241:32007/api/v1/query?query=go_memstats_frees_total%20[40d]&time=1580524260.813

    http://49.4.48.241:32007/api/v1/query?query=go_memstats_frees_total{instance="pushgateway",job="pushgateway"}[40d]&time=1580524260.813

    打印出请求头和参数:

    header:host==192.168.2.133:31636

    header:user-agent==Prometheus/2.14.0
    header:content-length==107
    header:accept-encoding==snappy
    header:content-encoding==snappy
    header:content-type==application/x-protobuf
    header:x-prometheus-remote-read-version==0.1.0
    url:http://192.168.2.133:31636/read
    ip:172.16.0.252
    startTime == 1577068260813
    endTime == 1580524260813
    matcher value==instance
    matcher type value==0
    matcher type==EQ
    matcher value==pushgateway
    matcher value==job
    matcher type value==0
    matcher type==EQ
    matcher value==pushgateway
    matcher value==__name__
    matcher type value==0
    matcher type==EQ
    matcher value==go_memstats_frees_total
    matcher value==instance
    matcher type value==0
    matcher type==EQ
    matcher value==pushgateway
    matcher value==job
    matcher type value==0
    matcher type==EQ
    matcher value==pushgateway
    matcher value==__name__
    matcher type value==0
    matcher type==EQ
    matcher value==go_memstats_frees_total

    使用了snappy压缩和protobuf序列化。所以需要使用proto生成java文件:

    下载protoc-3.5.1-win32.zip
    使用proto生成java文件:
    protoc --java_out=. remote.proto
    protoc --java_out=. rpc.proto
    protoc --java_out=. types.proto
    依赖以下文件:
    protoc --java_out=. gogoprotogogo.proto
    protoc --java_out=. googleprotobuf imestamp.proto
    protoc --java_out=. googleapiannotations.proto
    protoc --java_out=. googleapihttp.proto
    默认生成了prometheus下Remote.java,Rpc.java,Types.java
    comgoogleprotobufGoGoProtos.java
    comgoogleapiAnnotationsProto.java
    comgoogleapiCustomHttpPattern.java
    comgoogleapiCustomHttpPatternOrBuilder.java
    comgoogleapiHttp.java
    comgoogleapiHttpOrBuilder.java
    comgoogleapiHttpProto.java
    comgoogleapiHttpRule.java
    comgoogleapiHttpRuleOrBuilder.java

    修改prometheus配置文件:

    remote_write:
      - url: "http://localhost:9201/write"
    remote_read:
    #  - url: "http://localhost:9201/read"
      - url: "http://192.168.2.133:31636/read"
    

      

    实现代码如下:

    package com.chinasoft;
    
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.sql.Array;
    import java.sql.Connection;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Enumeration;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Map.Entry;
    
    import javax.servlet.ServletOutputStream;
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    
    import org.springframework.stereotype.Controller;
    import org.springframework.util.StringUtils;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.xerial.snappy.Snappy;
    import org.xerial.snappy.SnappyInputStream;
    
    import prometheus.Remote;
    import prometheus.Remote.Query;
    import prometheus.Remote.QueryResult;
    import prometheus.Remote.ReadRequest;
    import prometheus.Remote.ReadResponse;
    import prometheus.Types.Label;
    import prometheus.Types.LabelMatcher;
    import prometheus.Types.Sample;
    import prometheus.Types.TimeSeries;
    import ru.yandex.clickhouse.ClickHouseDataSource;
    import ru.yandex.clickhouse.settings.ClickHouseProperties;
    
    @Controller
    public class DBController {
    
    	@RequestMapping("/read")
    	public void readDB(HttpServletRequest request, HttpServletResponse response) {
    		printHeaders(request, response);
    		try {
    			ReadRequest readRequest = readParam(request);
    			List<Query> queryList = readRequest.getQueriesList();
    			for (Query query : queryList) {
    				System.out.println("startTime == " + query.getStartTimestampMs());
    				System.out.println("endTime == " + query.getEndTimestampMs());
    				String name = "";
    				for (LabelMatcher labelMatcher : query.getMatchersList()) {
    					name = labelMatcher.getValue();
    					System.out.println("matcher value==" + labelMatcher.getName());
    					System.out.println("matcher type value==" + labelMatcher.getTypeValue());
    					System.out.println("matcher type==" + labelMatcher.getType());
    					System.out.println("matcher value==" + labelMatcher.getValue());
    				}
    //				ReadResponse readResponse = queryBySql(name, query.getStartTimestampMs(), query.getEndTimestampMs());
    				ReadResponse readResponse = queryBySqlNew(query);
    				writeResponse(readResponse, response);
    			}
    
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    
    	private void writeResponse(ReadResponse readResponse, HttpServletResponse response) throws IOException {
    		byte[] datas = readResponse.toByteArray();
    		ServletOutputStream outStream = response.getOutputStream();
    		datas = Snappy.compress(datas);
    		response.setHeader("content-length", "" + datas.length);
    		outStream.write(datas);
    		outStream.flush();
    		outStream.close();
    	}
    
    	private ReadRequest readParam(HttpServletRequest request) throws IOException {
    		SnappyInputStream stream = new SnappyInputStream(request.getInputStream());
    		ByteArrayOutputStream baos = new ByteArrayOutputStream();
    		byte[] data = new byte[1024];
    		int len = 0;
    		while ((len = stream.read(data)) > -1) {
    			baos.write(data, 0, len);
    		}
    		stream.close();
    		ReadRequest readRequest = Remote.ReadRequest.parseFrom(baos.toByteArray());
    		return readRequest;
    	}
    
    	private void printHeaders(HttpServletRequest request, HttpServletResponse response) {
    		Enumeration<String> headers = request.getHeaderNames();
    		for (; headers.hasMoreElements();) {
    			String header = headers.nextElement();
    			System.out.println("header:" + header + "==" + request.getHeader(header));
    			response.setHeader(header, request.getHeader(header));
    		}
    //		Enumeration<String> attributeNames = request.getAttributeNames();
    //		for (; attributeNames.hasMoreElements();) {
    //			String attributeName = attributeNames.nextElement();
    //			System.out.println("attributeName:" + attributeName + "==" + request.getAttribute(attributeName));
    //		}
    		Map<String, String[]> parameterMap = request.getParameterMap();
    		for (Entry<String, String[]> entry : parameterMap.entrySet()) {
    			System.out.println("Parameters:" + entry.getKey() + ",value==" + Arrays.asList(entry.getValue()));
    
    		}
    		System.out.println("url:" + request.getRequestURL().toString());
    		System.out.println("ip:" + request.getRemoteHost());
    	}
    
    	private ReadResponse queryBySqlNew(Query query) {
    		ReadResponse readResponse = Remote.ReadResponse.newBuilder().build();
    		QueryResult queryResult = Remote.QueryResult.newBuilder().build();
    		readResponse.setResultsList(Arrays.asList(queryResult));
    		String sql = buildSql(query);
    		try {
    			Connection connection = getConn();
    			Statement statement = connection.createStatement();
    			System.out.println(sql);
    			ResultSet rs = statement.executeQuery(sql);
    			Map<String, TimeSeries> timeSerieMap = new HashMap<String, TimeSeries>();
    			while (rs.next()) {
    				String tagStrkey = rs.getString("tags");
    				TimeSeries timeSeries = null;
    				if (!timeSerieMap.containsKey(tagStrkey)) {
    					timeSeries = TimeSeries.newBuilder().build();
    					timeSerieMap.put(tagStrkey, timeSeries);
    					List<Sample> sampleList = new ArrayList<Sample>();
    					List<Label> labelList = new ArrayList<Label>();
    					Array array = rs.getArray("tags");
    					setLabelList(array, labelList);
    					timeSeries.setLabelsList(labelList);
    					timeSeries.setSamplesList(sampleList);
    				} else {
    					timeSeries = timeSerieMap.get(tagStrkey);
    				}
    				System.out.println(rs.getTimestamp(2).getTime());
    				System.out.println(rs.getDouble(1));
    				Sample sample = Sample.newBuilder().build();
    				timeSeries.getSamplesList()
    						.add(sample.setTimestamp(rs.getTimestamp("ts").getTime()).setValue(rs.getDouble("avg")));
    			}
    
    			queryResult.setTimeseriesList(new ArrayList<TimeSeries>(timeSerieMap.values()));
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    		return readResponse;
    	}
    
    	private void setLabelList(Array array, List<Label> labelList) {
    		try {
    			String[] arrays = (String[]) array.getArray();
    			for (int j = 1; j < arrays.length; j++) {
    				String str = arrays[j];
    				String[] strs = str.split("=");
    				labelList.add(Label.newBuilder().build().setName(strs[0]).setValue(strs[1]));
    			}
    			String[] first = arrays[0].split("=");
    			labelList.add(Label.newBuilder().build().setName(first[0]).setValue(first[1]));
    		} catch (SQLException e) {
    			e.printStackTrace();
    		}
    	}
    
    	private String buildSql(Query query) {
    		StringBuilder sb = new StringBuilder("select avg,ts,tags from metrics.samplesnew where");
    		sb.append(" ts>=toDateTime(");
    		sb.append(query.getStartTimestampMs() / 1000);
    		sb.append(") and ts<=toDateTime(");
    		sb.append(query.getEndTimestampMs() / 1000);
    		sb.append(")");
    		for (LabelMatcher labelMatcher : query.getMatchersList()) {
    			System.out.println("matcher value==" + labelMatcher.getName());
    			System.out.println("matcher type value==" + labelMatcher.getTypeValue());
    			System.out.println("matcher type==" + labelMatcher.getType());
    			System.out.println("matcher value==" + labelMatcher.getValue());
    			if ("__name__".equals(labelMatcher.getName())) {
    				appendNameWhere(labelMatcher, sb);
    			} else {
    				appendTagsWhere(labelMatcher, sb);
    			}
    		}
    		sb.append(" order by ts");
    		return sb.toString();
    	}
    
    	private void appendNameWhere(LabelMatcher labelMatcher, StringBuilder sb) {
    		String tagV = labelMatcher.getValue();
    		tagV = tagV.replaceAll("'", "\'");
    		if (StringUtils.isEmpty(tagV)) {
    			return;
    		}
    		switch (labelMatcher.getTypeValue()) {
    		case LabelMatcher.Type.EQ_VALUE:
    			sb.append(" and name='");
    			sb.append(tagV);
    			sb.append("'");
    			break;
    		case LabelMatcher.Type.NEQ_VALUE:
    			sb.append(" and name!='");
    			sb.append(tagV);
    			sb.append("'");
    			break;
    		case LabelMatcher.Type.RE_VALUE:
    			sb.append(" and match(name, '");
    			sb.append(tagV.replaceAll("/", "\/"));
    			sb.append("') = 1");
    			break;
    		case LabelMatcher.Type.NRE_VALUE:
    			sb.append(" and match(name, '");
    			sb.append(tagV.replaceAll("/", "\/"));
    			sb.append("') = 0");
    			break;
    		default:
    			sb.append(" and name='");
    			sb.append(tagV);
    			sb.append("'");
    		}
    	}
    
    	private void appendTagsWhere(LabelMatcher labelMatcher, StringBuilder sb) {
    		String tagK = labelMatcher.getName();
    		String tagV = labelMatcher.getValue();
    		tagV = tagV.replaceAll("'", "\'");
    		if (StringUtils.isEmpty(tagV)) {
    			return;
    		}
    		// 等于 arrayExists(x -> x IN (%s), tags) = 1
    		// 正则匹配 arrayExists(x -> 1 == match(x, '^%s=%s'),tags) = 1
    		switch (labelMatcher.getTypeValue()) {
    		case LabelMatcher.Type.EQ_VALUE:
    			sb.append(" and arrayExists(x -> x IN ('");
    			sb.append(tagK);
    			sb.append("=");
    			sb.append(tagV);
    			sb.append("'), tags) = 1");
    			break;
    		case LabelMatcher.Type.NEQ_VALUE:
    			sb.append(" and arrayExists(x -> x IN ('");
    			sb.append(tagK);
    			sb.append("=");
    			sb.append(tagV);
    			sb.append("'), tags) = 0");
    			break;
    		case LabelMatcher.Type.RE_VALUE:
    			sb.append(" and arrayExists(x -> 1 == match(x, '^");
    			sb.append(tagK);
    			sb.append("=");
    			sb.append(tagV.replaceAll("/", "\/"));
    			sb.append("'),tags) = 1");
    			break;
    		case LabelMatcher.Type.NRE_VALUE:
    			sb.append(" and arrayExists(x -> 1 == match(x, '^");
    			sb.append(tagK);
    			sb.append("=");
    			sb.append(tagV.replaceAll("/", "\/"));
    			sb.append("'),tags) = 0");
    			break;
    		default:
    			sb.append(" and arrayExists(x -> x IN ('");
    			sb.append(tagK);
    			sb.append("=");
    			sb.append(tagV);
    			sb.append("'), tags) = 1");
    		}
    	}
    
    	private ReadResponse queryBySql(String name, long start, long end) {
    		try {
    			Connection connection = getConn();
    			Statement statement = connection.createStatement();
    			String sql = "select avg,ts,tags from metrics.samplesnew where name='" + name
    					+ "' and toUnixTimestamp(ts)>=" + start / 1000 + " and toUnixTimestamp(ts)<=" + end / 1000
    					+ " order by ts";
    			ResultSet rs = statement.executeQuery(sql);
    			System.out.println(sql);
    			QueryResult queryResult = Remote.QueryResult.newBuilder().build();
    			TimeSeries timeSeries = TimeSeries.newBuilder().build();
    			queryResult.setTimeseriesList(Arrays.asList(timeSeries));
    			ReadResponse readResponse = Remote.ReadResponse.newBuilder().build();
    			readResponse.setResultsList(Arrays.asList(queryResult));
    			List<Sample> sampleList = new ArrayList<Sample>();
    			List<Label> labelList = new ArrayList<Label>();
    			timeSeries.setLabelsList(labelList);
    			timeSeries.setSamplesList(sampleList);
    			Array array = null;
    			while (rs.next()) {
    				System.out.println(rs.getTimestamp(2).getTime());
    				System.out.println(rs.getDouble(1));
    				Sample sample = Sample.newBuilder().build();
    				sampleList.add(sample.setTimestamp(rs.getTimestamp("ts").getTime()).setValue(rs.getDouble("avg")));
    				array = rs.getArray(3);
    			}
    			if (array != null) {
    				String[] arrays = (String[]) array.getArray();
    				int i = 0;
    				for (String str : arrays) {
    					if (i == 0) {
    						i++;
    						continue;
    					}
    					String[] strs = str.split("=");
    					labelList.add(Label.newBuilder().build().setName(strs[0]).setValue(strs[1]));
    				}
    				String[] first = arrays[0].split("=");
    				labelList.add(Label.newBuilder().build().setName(first[0]).setValue(first[1]));
    			}
    			return readResponse;
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    		return null;
    	}
    
    	private static String clickhouseAddress = "jdbc:clickhouse://117.78.23.187:32123";
    
    	private static String clickhouseUsername = "root";
    
    	private static String clickhousePassword = "123456";
    
    	private static String clickhouseDB = "metrics";
    
    	private static Integer clickhouseSocketTimeout = 600000;
    
    	private ClickHouseDataSource clickHouseDataSource;
    
    	{
    
    		ClickHouseProperties properties = new ClickHouseProperties();
    		properties.setUser(clickhouseUsername);
    		properties.setPassword(clickhousePassword);
    		properties.setDatabase(clickhouseDB);
    		properties.setSocketTimeout(clickhouseSocketTimeout);
    		properties.setConnectionTimeout(60000);
    		clickHouseDataSource = new ClickHouseDataSource(clickhouseAddress, properties);
    	}
    
    	private Connection getConn() {
    		try {
    			return clickHouseDataSource.getConnection();
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    		return null;
    	}
    
    }
    

    执行命令在环境上运行:

    /CloudResetPwdUpdateAgent/depend/jre1.8.0_131/bin/java -jar agent-0.0.1-SNAPSHOT.jar

    nohup方式
    nohup /CloudResetPwdUpdateAgent/depend/jre1.8.0_131/bin/java -jar agent-0.0.1-SNAPSHOT.jar > agent.log 2>&1 &

    在日志中打印出执行的sql语句:

    select avg,ts,tags from metrics.samplesnew where ts>=toDateTime(1577068260) and ts<=toDateTime(1580524260) and arrayExists(x -> x IN ('instance=pushgateway'), tags) = 1 and arrayExists(x -> x IN ('job=pushgateway'), tags) = 1 and name='go_memstats_frees_total' order by ts

    工程路径:

    https://files.cnblogs.com/files/yaoyu1983/agent.zip

  • 相关阅读:
    拾遗:systemctl --user
    拾遗:~/.zshrc 配置
    拾遗:YouCompleteMe 前传——编译安装 llvm + clang
    洛谷P1546 最短网络 Agri-Net(最小生成树,Kruskal)
    洛谷P1462 通往奥格瑞玛的道路(二分+spfa,二分+Dijkstra)
    HDU6669 Game(思维,贪心)
    HDU6672 Seq(找规律)
    HDU6668 Polynomial(模拟)
    洛谷P1378 油滴扩展(搜索)
    机器学习数学基础(四)
  • 原文地址:https://www.cnblogs.com/yaoyu1983/p/12334842.html
Copyright © 2011-2022 走看看