zoukankan      html  css  js  c++  java
  • Curator场景应用

    分布式锁功能:

    在分布式场景中,我们为了保证数据的一致性,经常在程序运行的某一个点,需要进行同步操作,(java提供synchronized或者Reentrantlock实现),

    使用curator基于zookeeper的特性提供的分布式锁来处理分布式场景的数据一致性,zookeeper本身的分布式是有写问题的,这里推荐使用Curator框架的

    InterProcessMutex来实现。 

     1 package bjsxt.curator.lock;
     2 
     3 import java.text.SimpleDateFormat;
     4 import java.util.Date;
     5 import java.util.concurrent.CountDownLatch;
     6 
     7 import org.apache.curator.RetryPolicy;
     8 import org.apache.curator.framework.CuratorFramework;
     9 import org.apache.curator.framework.CuratorFrameworkFactory;
    10 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
    11 import org.apache.curator.retry.ExponentialBackoffRetry;
    12 
    13 public class Lock2 {
    14 
    15     /** zookeeper地址 */
    16     static final String CONNECT_ADDR = "192.168.2.2:2181";
    17     /** session超时时间 */
    18     static final int SESSION_OUTTIME = 5000;// ms
    19 
    20     static int count = 10;
    21 
    22     public static void genarNo() {
    23         try {
    24             count--;
    25             System.out.println(count);
    26         } finally {
    27 
    28         }
    29     }
    30 
    31     public static void main(String[] args) throws Exception {
    32 
    33         // 1 重试策略:初试时间为1s 重试10次
    34         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
    35         // 2 通过工厂创建连接
    36         CuratorFramework cf = CuratorFrameworkFactory.builder()
    37                 .connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_OUTTIME)
    38                 .retryPolicy(retryPolicy)
    39                 // .namespace("super")
    40                 .build();
    41         // 3 开启连接
    42         cf.start();
    43 
    44         // 4 分布式锁
    45         final InterProcessMutex lock = new InterProcessMutex(cf, "/super");
    46         // final ReentrantLock reentrantLock = new ReentrantLock();
    47         final CountDownLatch countdown = new CountDownLatch(1);
    48 
    49         for (int i = 0; i < 10; i++) {
    50             new Thread(new Runnable() {
    51                 @Override
    52                 public void run() {
    53                     try {
    54                         countdown.await();
    55                         // 加锁
    56                         lock.acquire();
    57                         // reentrantLock.lock();
    58                         // -------------业务处理开始
    59                         // genarNo();
    60                         SimpleDateFormat sdf = new SimpleDateFormat(
    61                                 "HH:mm:ss|SSS");
    62                         System.out.println(sdf.format(new Date()));
    63                         // System.out.println(System.currentTimeMillis());
    64                         // -------------业务处理结束
    65                     } catch (Exception e) {
    66                         e.printStackTrace();
    67                     } finally {
    68                         try {
    69                             // 释放
    70                             lock.release();
    71                             // reentrantLock.unlock();
    72                         } catch (Exception e) {
    73                             e.printStackTrace();
    74                         }
    75                     }
    76                 }
    77             }, "t" + i).start();
    78         }
    79         Thread.sleep(100);
    80         countdown.countDown();
    81 
    82     }
    83 }

    分布式计数器功能

    分布式计数器,在单JVM中,我们可以通过AtomicInteger这种经典的方式实现,但是在分布式的场景下,就需要利用Curator框架的DistributedAtomicInteger来实现

     1 package bjsxt.curator.atomicinteger;
     2 
     3 import org.apache.curator.RetryPolicy;
     4 import org.apache.curator.framework.CuratorFramework;
     5 import org.apache.curator.framework.CuratorFrameworkFactory;
     6 import org.apache.curator.framework.recipes.atomic.AtomicValue;
     7 import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
     8 import org.apache.curator.retry.ExponentialBackoffRetry;
     9 import org.apache.curator.retry.RetryNTimes;
    10 
    11 public class CuratorAtomicInteger {
    12 
    13     /** zookeeper地址 */
    14     static final String CONNECT_ADDR = "192.168.2.2:2181";
    15     /** session超时时间 */
    16     static final int SESSION_OUTTIME = 5000;// ms
    17 
    18     public static void main(String[] args) throws Exception {
    19 
    20         // 1 重试策略:初试时间为1s 重试10次
    21         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
    22         // 2 通过工厂创建连接
    23         CuratorFramework cf = CuratorFrameworkFactory.builder()
    24                 .connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_OUTTIME)
    25                 .retryPolicy(retryPolicy).build();
    26         // 3 开启连接
    27         cf.start();
    28         // cf.delete().forPath("/super");
    29 
    30         // 4 使用DistributedAtomicInteger
    31         DistributedAtomicInteger atomicIntger = new DistributedAtomicInteger(
    32                 cf, "/super", new RetryNTimes(3, 1000));
    33 
    34         AtomicValue<Integer> value = atomicIntger.add(1);
    35         System.out.println(value.succeeded());
    36         System.out.println(value.postValue()); // 最新值
    37         System.out.println(value.preValue()); // 原始值
    38 
    39     }
    40 }

    Curator框架,让一些很困难的问题,简单化了

  • 相关阅读:
    achivemq(消息队列)的使用
    java高并发当时处理的思路
    字符串的应用
    正则表达式
    文本文件的读取与写入
    继承
    冒泡排序法
    类与对象
    数据类型
    关键字和语句
  • 原文地址:https://www.cnblogs.com/shmilyToHu/p/9117859.html
Copyright © 2011-2022 走看看