为了方便现场安装完了etcd集群后确认集群是否好用,简单写了个测试类,网上搜的有点乱还有些不能运行,在这里再整理一个能够直接运行的
1、我把etcd的API设成3版本了,调用使用的jetcd,功能挺多,这里只用了最简单的数据增删查操作,再Maven配置文件中增加依赖
<dependency> <groupId>io.etcd</groupId> <artifactId>jetcd-core</artifactId> <version>0.3.0</version> </dependency>
2、直接贴代码了,代码很简单没什么好解释的,要想用复杂功能还需要再去研究jetcd的各种API了
/** * */ package com.zyh.etcd; import static com.google.common.base.Charsets.UTF_8; import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import io.etcd.jetcd.ByteSequence; import io.etcd.jetcd.Client; import io.etcd.jetcd.Watch.Watcher; import io.etcd.jetcd.kv.GetResponse; import io.etcd.jetcd.options.GetOption; import io.etcd.jetcd.options.WatchOption; import io.etcd.jetcd.watch.WatchEvent; /** * etcd 操作工具,包括启动监听和操作etcd v3 版本协议,只测试功能,未添加log * * @version 1.0 * @author zhangyanhua * @date 2019年10月29日 下午4:30:57 */ public class EtcdUtil { // etcl客户端链接 private static Client etcdClient = null; // 链接初始化 public static synchronized Client getEtclClient() { if (etcdClient == null) { //String[] urls = PropertiesUtil.getValue("etcd_node_url").split(","); String[] urls = "http://10.110.30.210:2379,http://10.110.30.212:2379,http://10.110.30.213:2379".split(","); etcdClient = Client.builder().endpoints(urls).build(); } return etcdClient; } /** * 新增或者修改指定的配置 * * @param key * @param value * @throws Exception * @author zhangyanhua * @date 2019年10月29日 下午4:41:06 */ public static void putEtcdValueByKey(String key, String value) throws Exception { Client client = EtcdUtil.getEtclClient(); client.getKVClient().put(ByteSequence.from(key, UTF_8), ByteSequence.from(value, UTF_8)).get(); //System.out.println("put etcd key:value "" + key + ":" + value + "" success"); client.close(); etcdClient = null; } /** * 查询指定的key名称对应的value * * @param key * @return value值 * @throws Exception * @author zhangyanhua * @date 2019年10月29日 下午4:35:44 */ public static String getEtcdValueByKey(String key) throws Exception { Client client = EtcdUtil.getEtclClient(); GetResponse getResponse = client.getKVClient() .get(ByteSequence.from(key, UTF_8), GetOption.newBuilder().build()).get(); client.close(); etcdClient = null; // key does not exist if (getResponse.getKvs().isEmpty()) { return null; } return getResponse.getKvs().get(0).getValue().toString(UTF_8); } /** * 删除指定的配置 * * @param key * @throws InterruptedException * @throws ExecutionException * @author zhangyanhua * @date 2019年10月29日 下午4:53:24 */ public static void deleteEtcdValueByKey(String key) throws InterruptedException, ExecutionException { Client client = EtcdUtil.getEtclClient(); client.getKVClient().delete(ByteSequence.from(key, UTF_8)).get(); //System.out.println("delete etcd key "" + key + "" success"); client.close(); etcdClient = null; } /** * 持续监控某个key变化的方法,执行后如果key有变化会被监控到,输入结果如下 * watch type= "PUT", key= "zyh1", value= "zyh1-value" * watch type= "PUT", key= "zyh1", value= "zyh1-value111" * watch type= "DELETE", key= "zyh1", value= "" * * @param key * @throws Exception * @author zhangyanhua * @date 2019年10月29日 下午5:26:09 */ public static void watchEtcdKey(String key) throws Exception { Client client = EtcdUtil.getEtclClient(); // 最大事件数量 Integer maxEvents = Integer.MAX_VALUE; CountDownLatch latch = new CountDownLatch(maxEvents); Watcher watcher = null; try { ByteSequence watchKey = ByteSequence.from(key, UTF_8); WatchOption watchOpts = WatchOption.newBuilder().build(); watcher = client.getWatchClient().watch(watchKey, watchOpts, response -> { for (WatchEvent event : response.getEvents()) { System.out.println("watch type= "" + event.getEventType().toString() + "", key= "" + Optional.ofNullable(event.getKeyValue().getKey()).map(bs -> bs.toString(UTF_8)).orElse("") + "", value= "" + Optional.ofNullable(event.getKeyValue().getValue()) .map(bs -> bs.toString(UTF_8)).orElse("") + """); } latch.countDown(); }); latch.await(); } catch (Exception e) { if (watcher != null) { watcher.close(); client.close(); etcdClient = null; } throw e; } } /** * TODO * * @param args * @throws Exception * @author zhangyanhua * @date 2019年10月29日 下午6:01:54 */ public static void main(String[] args) throws Exception { boolean success = true; String key = "zyh"; String value = "zyh-value"; String newValue = "zyh-value-new"; System.out.println("**** 测试方法开始 ****"); EtcdUtil.putEtcdValueByKey(key, value); String retValue = EtcdUtil.getEtcdValueByKey(key); // System.out.println("查询key " + key + " 对应的值是 " + retValue); if (value.equals(retValue)) { System.out.println("数据插入成功。"); System.out.println("数据查询成功。"); } else { success = false; System.out.println("数据插入或查询失败!"); } EtcdUtil.putEtcdValueByKey(key, newValue); retValue = EtcdUtil.getEtcdValueByKey(key); // System.out.println("查询key " + key + " 对应的值是 " + retValue); if (newValue.equals(retValue)) { System.out.println("数据更新成功。"); } else { success = false; System.out.println("数据更新失败!"); } EtcdUtil.deleteEtcdValueByKey(key); retValue = EtcdUtil.getEtcdValueByKey(key); // System.out.println("查询key " + key + " 对应的值是 " + retValue); if (retValue == null) { System.out.println("数据删除成功。"); } else { success = false; System.out.println("数据删除失败!"); } // EtcdUtil.watchEtcdKey(key); if (success) { System.out.println("**** 测试方法全部通过。 ****"); } else { System.out.println("**** 测试失败! ****"); } } }
上面代码运行结果如下
还有个监控数据的watchEtcdKey方法没有执行,这个执行后代码是一直运行的,持续监控想要监控的数据,当数据出现变化事可以获取变化的事件,例如这样
在后台操作一组数据
监控端可以检测到数据变化