zoukankan      html  css  js  c++  java
  • skywalking-自定义kafka组件

    1.自定义kafka组件,进行kafka初始化,以及消息处理。

    /**
    * @author Ly
    * @create 2020/11/9 15:40
    * @desc
    **/
    @Slf4j
    public class ArgInterceptorAspect {

    private KafkaTemplate kafkaTemplate;

    public ArgInterceptorAspect(Class clazz) throws Exception{
    //初始化kafkaTemplate
    this.kafkaTemplate = new KafkaTemplate(new DefaultKafkaProducerFactory<String,String>(Config.producerProps(clazz)));
    }

    /**
    * 处理消息
    * @param args
    * @throws Throwable
    */
    public void handle(String args) throws Throwable{
    kafkaTemplate.send(Constants.KAFKA_SKYWALKING_ARG_TOPIC,args);
    }


    =======================================================================================================================================================
     

    /**
    * @author Ly
    * @create 2020/11/24 10:49
    * @desc
    **/
    @Slf4j
    public class Config {

    private final static Yaml yaml = new Yaml();

    private final static Properties p = new Properties();

    private final static Map<String,String> PROPERTIES_MAPS = Maps.newConcurrentMap();

    public static Map<String,String> getPropertiesMaps(){
    return PROPERTIES_MAPS;
    }

    public static Map<String,String> getConfigs(Class clazz) throws Exception{
    return Config.initConfigurations(clazz);
    }

    private static Map<String,String> initConfigurations(Class clazz) throws IOException {
    Reader reader = null;
    Map<String,String> results = Maps.newHashMap();
    Map resultMaps = Maps.newHashMap();
    reader = Config.readFile(ArgInterceptorAspect.class,Constants.FILE_NAME_DEFAULT);
    //读取配置文件信息 顺序:默认配置文件-> application.yml -> application.properties 相同的配置会进行覆盖
    if(reader != null){
    p.load(reader);
    resultMaps.putAll(properties2map(p));
    }
    if(clazz != null){
    reader = Config.readFile(clazz, Constants.FILE_NAME_PROPERTIES);
    if(reader != null){
    p.load(reader);
    resultMaps.putAll(properties2map(p));
    }
    reader = Config.readFile(clazz,Constants.FILE_NAME_YML);
    if(reader != null){
    Map yamlConfigurationMaps = Maps.newHashMap();
    YamlConvertPropertiesUtils.convert(yaml.loadAs(reader, Map.class),yamlConfigurationMaps,null);
    resultMaps.putAll(yamlConfigurationMaps);
    }
    Config.selectConfig(resultMaps);
    resultMaps.forEach((key,config)->{
    String value = PropertyPlaceholderHelper.INSTANCE.parseStringValue(String.valueOf(config));
    results.put(String.valueOf(key), value);
    });
    }
    PROPERTIES_MAPS.putAll(results);
    return results;
    }



    private static void selectConfig(final Map<String, Map<String, Object>> moduleConfiguration) {
    Iterator<Map.Entry<String, Map<String, Object>>> iterator = moduleConfiguration.entrySet().iterator();
    List<String> keys = PropertiesEnums.getKeys();
    while(iterator.hasNext()){
    Map.Entry<String, Map<String, Object>> next = iterator.next();
    String key = next.getKey();
    if(!keys.contains(key)){
    iterator.remove();
    }
    }
    }


    /**
    * 读取文件
    * @param clazz
    * @param fileName
    * @return
    * @throws FileNotFoundException
    */
    private static Reader readFile(Class clazz, String fileName) throws FileNotFoundException{
    URL url = clazz.getClassLoader().getResource(fileName);
    if (url == null) {
    log.warn("file not found: "+fileName);
    return null;
    }
    return new InputStreamReader(clazz.getClassLoader().getResourceAsStream(fileName));
    }


    public static Map<String, Object> consumerProps(Class clazz) throws Exception {
    Map<String, String> results = Config.getConfigs(clazz);
    Map<String, Object> props = Maps.newHashMap();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, results.get(PropertiesEnums.SPRING_KAFKA_CONSUMER.getKey()));
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, results.get(PropertiesEnums.SPRING_KAFKA_ENABLE_AUTO_COMMIT_CONFIG.getKey()));
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, results.get(PropertiesEnums.SPRING_KAFKA_AUTO_COMMIT_INTERVAL_MS_CONFIG.getKey()));
    //一次拉取消息数量
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, results.get(PropertiesEnums.SPRING_KAFKA_POLLRECORDS.getKey()));
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return props;
    }

    public static Map<String,Object> producerProps(Class clazz) throws Exception{
    Map<String, String> results = Config.getConfigs(clazz);
    Map configs = Maps.newHashMap();
    configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,results.get(PropertiesEnums.SPRING_KAFKA_PRODUCER.getKey()));
    configs.put("key.deserializer", StringDeserializer.class);
    configs.put("value.deserializer",StringDeserializer.class);
    configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
    configs.put("group.id",results.get(PropertiesEnums.SPRING_KAFKA_GROUP_ID.getKey()));
    configs.put("missing.topics.fatal",results.get(PropertiesEnums.SPRING_KAFKA_MISSING_FATAL.getKey()));
    log.info("init kafka kafkaClusters:{}",results.get(PropertiesEnums.SPRING_KAFKA_PRODUCER.getKey()));
    return configs;
    }

    /**
    * 属性转换成map
    * @param prop
    * @return
    */
    public static Map properties2map(Properties prop){
    Map<Object,Object> map = new HashMap<>();
    Enumeration enu = prop.keys();
    while (enu.hasMoreElements()) {
    Object o = enu.nextElement();
    Object obj = prop.get(o);
    map.put(o, obj);
    }
    return map;
    }

    }

    =======================================================================================================================================================

    public class PlaceholderConfigurerSupport {

    /**
    * Default placeholder prefix: {@value}
    */
    public static final String DEFAULT_PLACEHOLDER_PREFIX = "${";

    /**
    * Default placeholder suffix: {@value}
    */
    public static final String DEFAULT_PLACEHOLDER_SUFFIX = "}";

    /**
    * Default value separator: {@value}
    */
    public static final String DEFAULT_VALUE_SEPARATOR = ":";

    }



    =======================================================================================================================================================
     

    public enum PropertyPlaceholderHelper {

    INSTANCE(
    PlaceholderConfigurerSupport.DEFAULT_PLACEHOLDER_PREFIX,
    PlaceholderConfigurerSupport.DEFAULT_PLACEHOLDER_SUFFIX, PlaceholderConfigurerSupport.DEFAULT_VALUE_SEPARATOR,
    true
    );

    private final String placeholderPrefix;

    private final String placeholderSuffix;

    private final String simplePrefix;

    private final String valueSeparator;

    private final boolean ignoreUnresolvablePlaceholders;

    /**
    * Creates a new {@code PropertyPlaceholderHelper} that uses the supplied prefix and suffix.
    *
    * @param placeholderPrefix the prefix that denotes the start of a placeholder
    * @param placeholderSuffix the suffix that denotes the end of a placeholder
    * @param valueSeparator the separating character between the placeholder variable and the
    * associated default value, if any
    * @param ignoreUnresolvablePlaceholders indicates whether unresolvable placeholders should be ignored ({@code
    * true}) or cause an exception ({@code false})
    */
    PropertyPlaceholderHelper(String placeholderPrefix, String placeholderSuffix, String valueSeparator,
    boolean ignoreUnresolvablePlaceholders) {
    if (StringUtils.isEmpty(placeholderPrefix) || StringUtils.isEmpty(placeholderSuffix)) {
    throw new UnsupportedOperationException("'placeholderPrefix or placeholderSuffix' must not be null");
    }

    final Map<String, String> wellKnownSimplePrefixes = new HashMap<String, String>(4);

    wellKnownSimplePrefixes.put("}", "{");
    wellKnownSimplePrefixes.put("]", "[");
    wellKnownSimplePrefixes.put(")", "(");

    this.placeholderPrefix = placeholderPrefix;
    this.placeholderSuffix = placeholderSuffix;
    String simplePrefixForSuffix = wellKnownSimplePrefixes.get(this.placeholderSuffix);
    if (simplePrefixForSuffix != null && this.placeholderPrefix.endsWith(simplePrefixForSuffix)) {
    this.simplePrefix = simplePrefixForSuffix;
    } else {
    this.simplePrefix = this.placeholderPrefix;
    }
    this.valueSeparator = valueSeparator;
    this.ignoreUnresolvablePlaceholders = ignoreUnresolvablePlaceholders;
    }

    private String getConfigValue(String key, final Properties properties) {
    String value = System.getProperty(key);
    if (value == null) {
    value = System.getenv(key);
    }
    if (value == null) {
    value = properties.getProperty(key);
    }
    return value;
    }

    private String getConfigValue(String key, final Map properties) {
    String value = System.getProperty(key);
    if (value == null) {
    value = System.getenv(key);
    }
    if (value == null) {
    value = String.valueOf(properties.get(key));
    }
    return value;
    }

    private String getConfigValue(String key, String value) {
    String property = System.getProperty(key);
    if (StringUtils.isNotEmpty(property)) {
    return property;
    } else {
    String env = System.getenv(key);
    return StringUtils.isNotEmpty(env) ? env : value;
    }
    }

    public String parseStringValue(String value) {

    StringBuilder result = new StringBuilder(value);
    int startIndex = value.indexOf(this.placeholderPrefix);
    while (startIndex != -1) {
    int endIndex = findPlaceholderEndIndex(result, startIndex);
    if (endIndex != -1) {
    String placeholder = result.substring(startIndex + this.placeholderPrefix.length(), endIndex);
    // Recursive invocation, parsing placeholders contained in the placeholder key.
    placeholder = parseStringValue(placeholder);
    if(placeholder.indexOf(this.valueSeparator) != -1){
    return this.getConfigValue(placeholder.substring(0,placeholder.indexOf(this.valueSeparator)),placeholder.substring(placeholder.indexOf(this.valueSeparator)+1));
    }
    return placeholder;
    } else {
    startIndex = -1;
    }
    }
    return result.toString();
    }

    private int findPlaceholderEndIndex(CharSequence buf, int startIndex) {
    int index = startIndex + this.placeholderPrefix.length();
    int withinNestedPlaceholder = 0;
    while (index < buf.length()) {
    if (org.springframework.util.StringUtils.substringMatch(buf, index, this.placeholderSuffix)) {
    if (withinNestedPlaceholder > 0) {
    withinNestedPlaceholder--;
    index = index + this.placeholderSuffix.length();
    } else {
    return index;
    }
    } else if (org.springframework.util.StringUtils.substringMatch(buf, index, this.simplePrefix)) {
    withinNestedPlaceholder++;
    index = index + this.simplePrefix.length();
    } else {
    index++;
    }
    }
    return -1;
    }

    /**
    * Strategy interface used to resolve replacement values for placeholders contained in Strings.
    */
    public interface PlaceholderResolver {

    /**
    * Resolve the supplied placeholder name to the replacement value.
    *
    * @param placeholderName the name of the placeholder to resolve
    * @return the replacement value, or {@code null} if no replacement is to be made
    */
    String resolvePlaceholder(String placeholderName);
    }
    }
    =======================================================================================================================================================



    /**
    * @author Ly
    * @create 2020/11/9 15:51
    * @desc
    **/
    public class Constants {
    public static final String KAFKA_SKYWALKING_ARG_TOPIC = "kafka_skywalking_arg_topic";

    public static final String FILE_NAME_PROPERTIES = "application.properties";

    public static final String FILE_NAME_YML = "application.yml";

    public static final String FILE_NAME_DEFAULT = "default-kafka.properties";
    }

    =======================================================================================================================================================


    /**
    * @author Ly
    * @create 2020/11/20 14:48
    * @desc
    **/
    public enum PropertiesEnums {
    SPRING_KAFKA_PRODUCER("SPRING_KAFKA_PRODUCER","spring.kafka.producer.bootstrap-servers"),
    SPRING_KAFKA_CONSUMER("SPRING_KAFKA_CONSUMER","spring.kafka.consumer.bootstrap-servers"),
    SPRING_KAFKA_GROUP_ID("SPRING_KAFKA_GROUP_ID","spring.kafka.consumer.group-id"),
    SPRING_KAFKA_MISSING_FATAL("SPRING_KAFKA_MISSING_FATAL","spring.kafka.listener.missing-topics-fatal"),
    SPRING_KAFKA_CONCURRENCY("SPRING_KAFKA_CONCURRENCY","spring.kafka.default.concurrency"),
    SPRING_KAFKA_PARTITIONS("SPRING_KAFKA_PARTITIONS","spring.kafka.default.partitions"),
    SPRING_KAFKA_POLLRECORDS("SPRING_KAFKA_POLLRECORDS","spring.kafka.default.poll_records"),
    SPRING_KAFKA_AUTO_COMMIT_INTERVAL_MS_CONFIG("SPRING_KAFKA_AUTO_COMMIT_INTERVAL_MS_CONFIG","spring.kafka.default.auto_commit_interval_ms"),
    SPRING_KAFKA_ENABLE_AUTO_COMMIT_CONFIG("SPRING_KAFKA_ENABLE_AUTO_COMMIT_CONFIG","spring.kafka.default.enable_auth_commit_config"),
    SKYWALKING_TAG_IGNORE("SKYWALKING_TAG_IGNORE","skywalking.tag.ignore")
    ;

    private String value;

    private String key;

    PropertiesEnums(String value, String key) {
    this.value = value;
    this.key = key;
    }

    public String getValue() {
    return value;
    }

    public String getKey() {
    return key;
    }

    public static List<String> getKeys(){
    PropertiesEnums[] values = PropertiesEnums.values();
    List<String> keys = Lists.newArrayList();
    for (PropertiesEnums kafkaEnums : values) {
    keys.add(kafkaEnums.key);
    }
    return keys;
    }

    public static List<String> getValues(){
    PropertiesEnums[] values = PropertiesEnums.values();
    List<String> vs = Lists.newArrayList();
    for (PropertiesEnums kafkaEnums : values) {
    vs.add(kafkaEnums.value);
    }
    return vs;
    }
    }
    
    
    =======================================================================================================================================================




    spring.kafka.producer.bootstrap-servers = ${SPRING_KAFKA_PRODUCER:192.168.82.220:19092,192.168.82.220:29092,192.168.82.220:39092}
    spring.kafka.consumer.bootstrap-servers = ${SPRING_KAFKA_CONSUMER:192.168.82.220:19092,192.168.82.220:29092,192.168.82.220:39092}
    spring.kafka.consumer.group-id = ${SPRING_KAFKA_GROUP_ID:skywalkingReceiver}
    spring.kafka.listener.missing-topics-fatal = ${SPRING_KAFKA_MISSING_FATAL:false}
    spring.kafka.default.concurrency = ${SPRING_KAFKA_CONCURRENCY:15}
    spring.kafka.default.partitions = ${SPRING_KAFKA_PARTITIONS:20}
    spring.kafka.default.poll_records = ${SPRING_KAFKA_POLLRECORDS:10}
    spring.kafka.default.auto_commit_interval_ms = ${SPRING_KAFKA_AUTO_COMMIT_INTERVAL_MS_CONFIG:1000}
    spring.kafka.default.enable_auth_commit_config = ${SPRING_KAFKA_ENABLE_AUTO_COMMIT_CONFIG:true}
    skywalking.tag.ignore = ${SKYWALKING_TAG_IGNORE:requestParam,responseBody,head,uri}













  • 相关阅读:
    P/Invoke应用
    OC第八节——目录操作和文件管理
    OC第七节——内存管理
    OC第六节—— 继承与类别
    被拒原因——You have selected the Kids Category for your app, but it does not include the required privacy policy. Please update your app metadata to include a privacy policy URL and ensure that the URL yo
    OC第五节 ——点语法和@property
    OC第四节——NSDictionary和NSMutableDictionary
    OC第三节——NSArray和NSMutableArray
    OC第二节 —— NSString和NSMutableString
    OC第一节 —— 类和对象
  • 原文地址:https://www.cnblogs.com/ymqj520/p/14047059.html
Copyright © 2011-2022 走看看