1、增加依赖
<!-- curator ZK客户端-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.10.0</version>
</dependency>
2、定义生成sequence类型
/**
* 序列类型
*/
public enum ZkSequenceEnum {
FIRST,
SECOND,
THIRD
}
3、序列封装
/**
* 通过分布式原子自增类(DistributedAtomicLong)实现,
* 注意每5000毫秒重试5次后仍然生成失败则返回null,由上层处理
*/
public class ZkSequence {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(5000,5);
DistributedAtomicLong distributedAtomicLong;
public ZkSequence(String sequenceName, CuratorFramework client ){
distributedAtomicLong = new DistributedAtomicLong( client,sequenceName, retryPolicy);
}
/**
* 生成序列
* @return
*/
public Long sequence() throws Exception{
AtomicValue<Long> sequence = this.distributedAtomicLong.increment();
if(sequence.succeeded()){
return sequence.postValue();
}else {
return null;
}
}
}
4、配置文件
@Configuration @ConfigurationProperties(prefix = "zk") @PropertySource("classpath:zookeeper.properties") public class ZkConfig { String host = "118.xx.xx.101"; String sequencePath = "/new/sequence/"; @Bean public ZookeeperClient zookeeperClient() { return new ZookeeperClient(this.host, this.sequencePath); } }
5、客户端封装
public class ZookeeperClient {
private String host;
private String sequencePath;
// 重试休眠时间
private final int SLEEP_TIME_MS = 1000;
// 最大重试1000次
private final int MAX_RETRIES = 1000;
//会话超时时间
private final int SESSION_TIMEOUT = 30 * 1000;
//连接超时时间
private final int CONNECTION_TIMEOUT = 3 * 1000;
//创建连接实例
private CuratorFramework client = null;
// 序列化集合
private Map<String, ZkSequence> zkSequence = Maps.newConcurrentMap();
public ZookeeperClient(String host, String sequencePath) {
this.host = host;
this.sequencePath = sequencePath;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
@PostConstruct
public void init() throws Exception {
this.client = CuratorFrameworkFactory.builder()
.connectString(this.getHost())
.connectionTimeoutMs(CONNECTION_TIMEOUT)
.sessionTimeoutMs(SESSION_TIMEOUT)
.retryPolicy(new ExponentialBackoffRetry(SLEEP_TIME_MS, MAX_RETRIES))
.build();
this.client.start();
this.initZkSequence();
}
public void initZkSequence() {
ZkSequenceEnum[] list = ZkSequenceEnum.values();
for (int i = 0; i < list.length; i++) {
String name = list[i].name();
String path = this.sequencePath + name;
ZkSequence seq = new ZkSequence(path, this.client);
zkSequence.put(name, seq);
}
}
/*** 生成SEQ */
public Long sequence(ZkSequenceEnum name) {
Long result = null;
try {
ZkSequence seq = zkSequence.get(name.name());
if (seq != null) {
result = seq.sequence();
}
} catch (Exception e) {
System.out.println("获取" + name + "Sequence错误: " + e.getMessage());
}
return result;
}
}
5、调用客户端
@Component
public class Sequences {
@Autowired
private ZookeeperClient client;
public Long sequenceFist(){
return this.client.sequence(ZkSequenceEnum.FIRST);
}
public Long sequenceSecond(){
return this.client.sequence(ZkSequenceEnum.SECOND);
}
public Long sequenceThird(){
return this.client.sequence(ZkSequenceEnum.THIRD);
}
}
6、测试
@SpringBootTest
class ZookeepdemoApplicationTests {
// 第一步,注入Sequences
@Autowired
private Sequences sequences;
@Test
void sequenceApCollection() throws Exception {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5));
for (int i = 0; i < 10; i++) {
MyTask myTask = new MyTask(sequences);
executor.execute(myTask);
}
Thread.sleep(100000);
executor.shutdown();
}
private class MyTask implements Runnable {
private Sequences sequences;
public MyTask(Sequences sequences) {
this.sequences = sequences;
}
@Override
public void run() {
// 第二步,在方法中调用生成
Long num = sequences.sequenceFist();
System.out.println("num=" + num);
}
}
}
分别启动两个测试工程
