zoukankan      html  css  js  c++  java
  • 在java中如何使用etcd的v2 和v3 api获取配置,并且对配置的变化进行监控和监听

    etcd 和zookeeper 很像,都可以用来做配置管理。并且etcd可以在目前流行的Kubernetes中使用。

    但是etcd 提供了v2版本合v3的版本的两种api。我们现在分别来介绍一下这两个版本api的使用。

    一、Etcd V2版本API

    1、java工程中使用maven引入 etcd v2的java api操作jar包

    <dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.21.Final</version>
    </dependency>
    <dependency>
    <groupId>org.mousio</groupId>
    <artifactId>etcd4j</artifactId>
    <version>2.15.0</version>
    </dependency>

    2、etcd的链接
    import mousio.etcd4j.EtcdClient;
    import java.io.InputStream;
    import java.net.URI;
    import java.util.Properties;
    
    public class EtcdUtil {
    //etcd客户端链接
    private static EtcdClient client = null;
    //链接初始化
    public static synchronized EtcdClient getClient(){
    if (null == client){
    client = new EtcdClient (URI.create(properties.getProperty("http://127.0.0.1:2379")));
    }
    return client;
    }
    }

    3、如何获取etcd的配置并且对配置进行监听
    //初始化获取etcd配置并且进行配置监听
    private void initEtcdConfig() {
    EtcdKeysResponse dataTree ;
    try {
    final EtcdClient etcdClient = EtcdUtil.getClient();
    //获取etcd中名称叫ETCD文件夹下的配置
    EtcdKeyGetRequest etcdKeyGetRequest = etcdClient.getDir("ETCD").consistent();
    dataTree = etcdKeyGetRequest.send().get();
    //获取etcd的版本
    System.out.println("ETCD's version:"+etcdClient.getVersion());
    getConfig("/ETCD/example.config",dataTree); //加载配置项
    //启动一个线程进行监听
    startListenerThread(etcdClient);
    } catch (Exception e) {
    System.out.println("EtcdClient init cause Exception:"+e.getMessage());
    e.printStackTrace();
    }
    }
    private String getConfig(String configFile,EtcdKeysResponse dataTree){
    if(null != dataTree && dataTree.getNode().getNodes().size()>0){
    for(EtcdKeysResponse.EtcdNode node:dataTree.getNode().getNodes()){
    if(node.getKey().equals(configFile)){
    return node.getValue();
    }
    }
    }
    System.out.println("Etcd configFile"+ configFile+"is not exist,Please Check");
    return null;
    }
    public void startListenerThread(EtcdClient etcdClient){
    new Thread(()->{
    startListener(etcdClient);
    }).start();
    }
    public void startListener(EtcdClient etcdClient){
    ResponsePromise promise =null;
    try {
    promise = etcdClient.getDir(SYSTEM_NAME).recursive().waitForChange().consistent().send();
    promise.addListener(promisea -> {
    System.out.println("found ETCD's config cause change");
    try {
    getConfig("/ETCD/example.config", etcdClient.getDir("ETCD").consistent().send().get()); //加载配置项
    } catch (Exception e) {
    e.printStackTrace();
    System.out.println("listen etcd 's config change cause exception:{}"+e.getMessage());
    }
    startListener(etcdClient);
    });
    } catch (Exception e) {
    startListener(etcdClient);
    System.out.println("listen etcd 's config change cause exception:"+e.getMessage());
    e.printStackTrace();
    }
    }
    4、使用dcmp管理 etcd的配置项
    dcmp 是一个使用go语言开发的etcd配置界面,不足的时,这个只支持v2的API,项目的地址:

    https://github.com/silenceper/dcmp

    二、Etcd V3版本API

    1、在工程引入如下依赖

    <dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.15.Final</version>
    </dependency>
    <dependency>
    <groupId>com.coreos</groupId>
    <artifactId>jetcd-core</artifactId>
    <version>0.0.2</version>
    </dependency>
    2、v3 api操作工具类
    public class EtcdUtil {
        //etcl客户端链接
        private static Client client = null;
        //链接初始化
        public static synchronized Client getEtclClient(){
            if(null == client){
                client = Client.builder().endpoints(props.getProperty("http://127.0.0.1:2379")).build();
            }
                return client;
        }
    
        /**
         * 根据指定的配置名称获取对应的value
         * @param key 配置项
         * @return
         * @throws Exception
         */
        public static String getEtcdValueByKey(String key) throws Exception {
            List<KeyValue> kvs = EtcdUtil.getEtclClient().getKVClient().get(ByteSequence.fromString(key)).get().getKvs();
            if(kvs.size()>0){
                String value = kvs.get(0).getValue().toStringUtf8();
                return value;
            }
            else {
                return null;
            }
        }
    
        /**
         * 新增或者修改指定的配置
         * @param key
         * @param value
         * @return
         */
        public static void putEtcdValueByKey(String key,String value) throws Exception{
            EtcdUtil.getEtclClient().getKVClient().put(ByteSequence.fromString(key),ByteSequence.fromBytes(value.getBytes("utf-8")));
    
        }
    
        /**
         * 删除指定的配置
         * @param key
         * @return
         */
        public static void deleteEtcdValueByKey(String key){
             EtcdUtil.getEtclClient().getKVClient().delete(ByteSequence.fromString(key));
    
        }
    }
    //V3 api配置初始化和监听
    public void init(){
    try {
        //加载配置
        getConfig(EtcdUtil.getEtclClient().getKVClient().get(ByteSequence.fromString(ETCD_CONFIG_FILE_NAME)).get().getKvs());
    //启动监听线程
        new Thread(() -> {
    //对某一个配置进行监听
            Watch.Watcher watcher = EtcdUtil.getEtclClient().getWatchClient().watch(ByteSequence.fromString("etcd_key"));
            try {
                while(true) {
                    watcher.listen().getEvents().stream().forEach(watchEvent -> {
                        KeyValue kv = watchEvent.getKeyValue();
                //获取事件变化类型
                System.out.println(watchEvent.getEventType());
                     //获取发生变化的key
                      System.out.println(kv.getKey().toStringUtf8());
              //获取变化后的value
                        String afterChangeValue = kv.getValue().toStringUtf8();
    
                    });
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
    
            }
        }).start();
    } catch (Exception e) {
        e.printStackTrace();
    
    }
    
    }
    private String getConfig(List<KeyValue> kvs){
        if(kvs.size()>0){
            String config = kvs.get(0).getValue().toStringUtf8();
            System.out.println("etcd 's config 's configValue is :"+config);
            return config;
        }
        else {
            return null;
        }
    }

    v3版本的正式版本代码(可以直接使用的工具类,并且加上了日志):

     1 import com.coreos.jetcd.Client;
     2 import com.coreos.jetcd.Watch;
     3 import com.coreos.jetcd.data.ByteSequence;
     4 import com.coreos.jetcd.data.KeyValue;
     5 import org.slf4j.Logger;
     6 import org.slf4j.LoggerFactory;
     7 import java.util.List;
     8 /**
     9  *  etcd 链接和操作工具,包括启动监听 操作etcd v3 版本协议,此操作不支持v2 版本协议。
    10  *  v2版本的协议可以参考 https://www.cnblogs.com/laoqing/p/8967549.html
    11  */
    12 public class EtcdUtil {
    13     public static Logger log = LoggerFactory.getLogger(EtcdUtil.class);
    14     //etcl客户端链接
    15     private static Client client = null;
    16 
    17     //链接初始化 单例模式
    18     public static synchronized Client getEtclClient(){
    19         if(null == client){
    20             client = Client.builder().endpoints("http://xxx.xxx.xxx.xxx:2379").build();
    21         }
    22         return client;
    23     }
    24     /**
    25      * 根据指定的配置名称获取对应的value
    26      * @param key 配置项
    27      * @return
    28      * @throws Exception
    29      */
    30     public static String getEtcdValueByKey(String key) throws Exception {
    31         List<KeyValue> kvs = EtcdUtil.getEtclClient().getKVClient().get(ByteSequence.fromString(key)).get().getKvs();
    32         if(kvs.size()>0){
    33             String value = kvs.get(0).getValue().toStringUtf8();
    34             return value;
    35         }
    36         else {
    37             return null;
    38         }
    39     }
    40     /**
    41      * 新增或者修改指定的配置
    42      * @param key
    43      * @param value
    44      * @return
    45      */
    46     public static void putEtcdValueByKey(String key,String value) throws Exception{
    47         EtcdUtil.getEtclClient().getKVClient().put(ByteSequence.fromString(key),ByteSequence.fromBytes(value.getBytes("utf-8")));
    48     }
    49     /**
    50      * 删除指定的配置
    51      * @param key
    52      * @return
    53      */
    54     public static void deleteEtcdValueByKey(String key){
    55         EtcdUtil.getEtclClient().getKVClient().delete(ByteSequence.fromString(key));
    56     }
    57     /**
    58      * etcd的监听,监听指定的key,当key 发生变化后,监听自动感知到变化。
    59      * @param key 指定需要监听的key
    60      */
    61     public static void initListen(String key){
    62         try {
    63             //加载配置
    64             getConfig(EtcdUtil.getEtclClient().getKVClient().get(ByteSequence.fromString(key)).get().getKvs());
    65             new Thread(() -> {
    66                 Watch.Watcher watcher = EtcdUtil.getEtclClient().getWatchClient().watch(ByteSequence.fromString(key));
    67                 try {
    68                     while(true) {
    69                         watcher.listen().getEvents().stream().forEach(watchEvent -> {
    70                             KeyValue kv = watchEvent.getKeyValue();
    71                             log.info("etcd event:{} ,change key is:{},afterChangeValue:{}",watchEvent.getEventType(),kv.getKey().toStringUtf8(),kv.getValue().toStringUtf8());
    72                         });
    73                     }
    74                 } catch (InterruptedException e) {
    75                     log.info("etcd listen start cause Exception:{}",e);
    76                 }
    77             }).start();
    78         } catch (Exception e) {
    79             log.info("etcd listen start cause Exception:{}",e);
    80         }
    81     }
    82     private static String getConfig(List<KeyValue> kvs){
    83         if(kvs.size()>0){
    84             String config = kvs.get(0).getKey().toStringUtf8();
    85             String value = kvs.get(0).getValue().toStringUtf8();
    86             log.info("etcd 's config 's config key is :{},value is:{}",config,value);
    87             return value;
    88         }
    89         else {
    90             return null;
    91         }
    92     }
    93 }

    运行后的效果:

    另外v3 版本api 的管理界面,可以参考如下开源项目

    https://github.com/shiguanghuxian/etcd-manage 

     本人github源码:

    https://github.com/597365581/bigdata_tools/tree/master/yongqing-bigdata-tools/yongqing-etcd-tool

    https://github.com/597365581/bigdata_tools/tree/master/yongqing-bigdata-tools/yongqing-etcd-tool-v2

    【原文归作者所有,欢迎转载,但是保留版权,并且转载时,需要注明出处

  • 相关阅读:
    新院址运行统计
    游标使用之四
    游标使用之三
    css基础知识
    javascript基础知识
    [每日一题2020.06.20]BFS
    白嫖一个免费域名并解析到博客园
    [每日一题2020.06.19]leetcode #84 #121 单调栈
    操作系统---文件管理
    [每日一题2020.06.18]leetcode #3 hash_map实现滑动窗口
  • 原文地址:https://www.cnblogs.com/laoqing/p/8967549.html
Copyright © 2011-2022 走看看