之前我们了解了基于Corator的分布式锁之后,我们就很容易基于其实现一个分布式计数器,顾名思义,计数器是用来计数的, 利用ZooKeeper可以实现一个集群共享的计数器。 只要使用相同的path就可以得到最新的计数器值, 这是由ZooKeeper的一致性保证的。Curator有两种计数器。
SharedCount
这个类使用int类型来计数。 主要涉及三个类。
SharedCount
SharedCountReader
SharedCountListener
SharedCount代表计数器, 可以为它增加一个SharedCountListener,当计数器改变时此Listener可以监听到改变的事件,而SharedCountReader可以读取到最新的值, 包括字面值和带版本信息的值VersionedValue。SharedCount必须调用start()方法开启,使用完之后必须调用close关闭它。
SharedCount有以下几个主要方法
/** 强制设置值 */ public void setCount(int newCount) throws Exception; /** 第一个参数提供当前的VersionedValue,如果期间其它client更新了此计数值, * 你的更新可能不成功 更新不成功返回false 但可以通过getCount()读取最新值*/ public boolean trySetCount(VersionedValue<Integer> previous, int newCount) throws Exception; /** 获取当前最新值 */ public int getCount();
例子
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.shared.SharedCount; import org.apache.curator.framework.recipes.shared.SharedCountListener; import org.apache.curator.framework.recipes.shared.SharedCountReader; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.ExponentialBackoffRetry; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class SharedCountCase { public static void main(String[] args) throws Exception { final int clientNum = 5; final String BASE_PATH = "/felixzh/counter"; CuratorFramework cfClient = CuratorFrameworkFactory.builder().connectString("felixzh:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); cfClient.start(); ExecutorService executorService = Executors.newFixedThreadPool(clientNum); for (int i = 0; i < clientNum; i++) { executorService.submit(() -> { try { SharedCount sharedCount = new SharedCount(cfClient, BASE_PATH, 0); sharedCount.addListener(new SharedCountListener() { @Override public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception { //每个线程都能监听到变化 System.out.println(sharedCount.getVersionedValue().getValue() + "," + newCount); } @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { } }); sharedCount.start(); boolean res = false; while (!res) { res = sharedCount.trySetCount(sharedCount.getVersionedValue(), sharedCount.getVersionedValue().getValue() + 1); } System.out.println("current value: " + sharedCount.getVersionedValue().getValue()); } catch (Exception e) { e.printStackTrace(); } }); } Thread.sleep(3_000); executorService.shutdown(); } }
程序运行,输出以下结果:
current value: 91
current value: 92
current value: 93
current value: 94
current value: 95
DistributedAtomicInteger 和 DistributedAtomicLong
DistributedAtomicInteger和SharedCount计数范围是一样的,都是int类型,但是DistributedAtomicInteger和DistributedAtomicLong和上面的计数器的实现有显著的不同,它首先尝试使用乐观锁的方式设置计数器, 如果不成功(比如期间计数器已经被其它client更新了), 它使用InterProcessMutex方式来更新计数值。 还记得InterProcessMutex是什么吗? 它是我们前面讲的分布式可重入锁。下面只讲解DistributedAtomicLong。
可以从它的内部实现DistributedAtomicValue.trySet中看出端倪。
此计数器有一系列的操作:
get(): 获取当前值
increment(): 加一
decrement(): 减一
add(): 增加特定的值
subtract(): 减去特定的值
trySet(): 尝试设置计数值
forceSet(): 强制设置计数值
你必须检查返回结果的succeeded(), 它代表此操作是否成功。 如果操作成功, preValue()代表操作前的值, postValue()代表操作后的值。
例子
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.atomic.AtomicValue; import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.retry.RetryNTimes; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; public class DistributedAtomicLongCase { public static void main(String[] args) throws Exception { CuratorFramework cfClient = CuratorFrameworkFactory.builder().connectString("felixzh:2181") .retryPolicy(new ExponentialBackoffRetry(100, 3)).build(); cfClient.start(); final int clientNum = 5; final String BASE_PATH = "/felixzh_distributed_count"; ExecutorService executorService = Executors.newFixedThreadPool(clientNum); for (int i = 0; i < clientNum; i++) { executorService.submit(() -> { try { final DistributedAtomicLong distributedAtomicLong = new DistributedAtomicLong(cfClient, BASE_PATH, new RetryNTimes(3, 1000)); AtomicValue<Long> atomicValue = distributedAtomicLong.increment(); if (atomicValue.succeeded()) { System.out.println("pre value: " + atomicValue.preValue() + "," + "post value: " + atomicValue.postValue()); System.out.println("current value: " + distributedAtomicLong.get().postValue()); } } catch (Exception e) { e.printStackTrace(); } }); } Thread.sleep(3_000); executorService.shutdown(); } }
程序运行,输出以下结果:
pre value: 55,post value: 56 current value: 56 pre value: 56,post value: 57 current value: 57 pre value: 57,post value: 58 current value: 58 pre value: 58,post value: 59 current value: 59 pre value: 59,post value: 60 current value: 60