zoukankan      html  css  js  c++  java
  • zookeeper典型应用场景总结

    分布式一致性配置

      在集群环境下,挨个更改配置是比较繁琐的,使用zookeeper可以实现同步配置。

    1、配置信息

     1 package com.zk;
     2 
     3 import java.io.Serializable;
     4 
     5 /**
     6  * 模拟公共配置类
     7  * 
     8  * @author Zomi
     9  */
    10 public class DbConfig implements Serializable{
    11     private static final long serialVersionUID = -4483388642208582886L;
    12     
    13     // 数据库配置,有默认值
    14     private String url = "jdbc:mysql://127.0.0.1:3306/mydata?useUnicode=true&characterEncoding=utf-8";
    15     private String username = "root";
    16     private String password = "root";
    17     private String driverClass = "com.mysql.jdbc.Driver";
    18 
    19 
    20     public DbConfig(String url, String username, String password, String driverClass) {
    21         this.url = url;
    22         this.username = username;
    23         this.password = password;
    24         this.driverClass = driverClass;
    25     }
    26 
    27     public DbConfig() {}
    28     
    29     public String getUrl() {
    30         return url;
    31     }
    32 
    33     public void setUrl(String url) {
    34         this.url = url;
    35     }
    36 
    37     public String getUsername() {
    38         return username;
    39     }
    40 
    41     public void setUsername(String username) {
    42         this.username = username;
    43     }
    44 
    45     public String getPassword() {
    46         return password;
    47     }
    48 
    49     public void setPassword(String password) {
    50         this.password = password;
    51     }
    52 
    53     public String getDriverClass() {
    54         return driverClass;
    55     }
    56 
    57     public void setDriverClass(String driverClass) {
    58         this.driverClass = driverClass;
    59     }
    60 
    61     @Override
    62     public String toString() {
    63         return "CommConfig [url=" + url + ", username=" + username + ", password=" + password + ", driverClass="
    64                 + driverClass + "]";
    65     }
    66 
    67 }
    View Code

    2、配置管理服务

     1 package com.zk;
     2 
     3 import org.I0Itec.zkclient.ZkClient;
     4 
     5 /**
     6  * zk配置管理服务器,用于将配置信息的修改同步到zk上
     7  * @author Zomi
     8  */
     9 public class ZkConfigMng {
    10     private String nodePath = "/dbConfig";
    11     private DbConfig dbConfig;
    12     private ZkClient zkClient;
    13     
    14     public ZkConfigMng() {
    15         this.zkClient = new ZkClient("192.168.31.130:2181");
    16     }
    17     //更新配置
    18     public DbConfig update(DbConfig dbConfig) {
    19         this.dbConfig = dbConfig;
    20         syncConfigToZookeeper();//将配置变更同步给zk
    21         return dbConfig;
    22     }
    23     private void syncConfigToZookeeper() {
    24         if(!zkClient.exists(nodePath)) {
    25             zkClient.createPersistent(nodePath);
    26         }
    27         zkClient.writeData(nodePath, dbConfig);
    28     }
    29 }
    View Code

    3、模拟应用服务集群,具备监听配置变更的功能

     1 package com.zk;
     2 
     3 import java.util.concurrent.TimeUnit;
     4 
     5 import org.I0Itec.zkclient.IZkDataListener;
     6 import org.I0Itec.zkclient.ZkClient;
     7 import org.I0Itec.zkclient.ZkConnection;
     8 
     9 /**
    10  * 模拟多服务器
    11  * 
    12  * @author Zomi
    13  */
    14 public class ZkConfigClient implements Runnable {
    15 
    16     private String nodePath = "/dbConfig";
    17     private DbConfig dbConfig;
    18     
    19     @Override
    20     public void run() {
    21         ZkClient zkClient = new ZkClient(new ZkConnection("192.168.31.130:2181", 5000));
    22         while (!zkClient.exists(nodePath)) {
    23             System.out.println("配置节点不存在!");
    24             try {
    25                 TimeUnit.SECONDS.sleep(1);
    26             } catch (InterruptedException e) {
    27                 e.printStackTrace();
    28             }
    29         }
    30         dbConfig = (DbConfig)zkClient.readData(nodePath);
    31         System.out.println(Thread.currentThread().toString() +"原数据为=="+ dbConfig);
    32         
    33         //监听配置数据的改变
    34         zkClient.subscribeDataChanges(nodePath, new IZkDataListener() {
    35             @Override
    36             public void handleDataDeleted(String dataPath) throws Exception {
    37                 System.out.println(Thread.currentThread().toString() + "监听到节点:" + dataPath + "被删除了!");
    38             }
    39             @Override
    40             public void handleDataChange(String dataPath, Object data) throws Exception {
    41                 System.out.println(Thread.currentThread().toString() + "监听到节点:" + dataPath + ", 数据:" + data + " - 更新");
    42                 dbConfig = (DbConfig) data;
    43             }
    44         });
    45     }
    46 }
    View Code
     1 package com.zk;
     2 
     3 import java.util.concurrent.ExecutorService;
     4 import java.util.concurrent.Executors;
     5 
     6 public class BusinessServers {
     7     public static void main(String[] args) {
     8         ExecutorService executorService = Executors.newFixedThreadPool(3);
     9         // 模拟多个服务器获取配置
    10         executorService.submit(new ZkConfigClient());
    11         executorService.submit(new ZkConfigClient());
    12         executorService.submit(new ZkConfigClient());
    13     }
    14 }
    View Code

    4、更新配置信息进行测试

     1 package com.zk;
     2 
     3 import java.util.concurrent.TimeUnit;
     4 
     5 public class MngServer {
     6 
     7     public static void main(String[] args) throws InterruptedException {
     8         
     9         DbConfig dbConfig = new DbConfig();
    10         ZkConfigMng zkConfigMng = new ZkConfigMng();
    11         DbConfig xx = zkConfigMng.update(dbConfig);//初始化节点及数据
    12         System.out.println(xx);
    13         
    14         TimeUnit.SECONDS.sleep(10);
    15         
    16         // 修改值        
    17         zkConfigMng.update(new DbConfig("jdbc:mysql://192.168.6.6:3306/mydata?"
    18                 + "useUnicode=true&characterEncoding=utf-8","admin", "admin", "com.mysql.jdbc.Driver"));
    19         
    20     }
    21 
    22 }
    View Code

    5、测试结果

     分布式锁

    通常实现分布式锁有如下方式:

    1、基于数据库锁。(for update悲观锁;或者版本号乐观锁)

    2、基于redis方式。(可参考此文https://www.cnblogs.com/zomicc/p/12468324.html

    3、基于zookeeper方式。

    Zookeeper中的节点分为四种类型:

    • 持久节点(PERSISTENT)
    • 持久顺序节点(PERSISTENT_SEQUENTIAL)
    • 临时节点(EPHEMERAL)
    • 临时顺序节点(EPHEMERAL_SEQUENTIAL)

    zookeeper分布式锁基于第四种临时顺序节点实现分布式锁。原理图如下:

            

    下面的代码是根据Zookeeper的开源客户端Curator实现分布式锁。采用zk的原生API实现会比较繁琐,所以这里就直接用Curator这个轮子,采用Curator的acquirerelease两个方法就能实现分布式锁。
    1、依赖包
     1 <dependency>
     2     <groupId>org.apache.curator</groupId>
     3     <artifactId>curator-framework</artifactId>
     4     <version>4.2.0</version>
     5 </dependency>
     6 <dependency>
     7     <groupId>org.apache.curator</groupId>
     8     <artifactId>curator-recipes</artifactId>
     9     <version>4.2.0</version>
    10 </dependency>
    View Code

    2、测试代码

     1 package com.zk.distributeLock;
     2 
     3 import java.io.IOException;
     4 import java.util.concurrent.ExecutorService;
     5 import java.util.concurrent.Executors;
     6 import org.apache.curator.RetryPolicy;
     7 import org.apache.curator.framework.CuratorFramework;
     8 import org.apache.curator.framework.CuratorFrameworkFactory;
     9 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
    10 import org.apache.curator.retry.ExponentialBackoffRetry;
    11 
    12 public class CuratorDistributeLock {
    13 
    14     public static void main(String[] args) throws IOException {
    15         //参数一:表示获取不到锁,则1000ms后重试 
    16         //参数二:最多重试3次
    17         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    18         
    19         ExecutorService excutor = Executors.newFixedThreadPool(5);
    20         for(int i=0;i<5;i++) {//使用线程池模拟同时5个请求
    21             excutor.submit(new Runnable() {
    22                 InterProcessMutex mutex;
    23                 CuratorFramework client;
    24                 @Override
    25                 public void run() {
    26                     try {
    27                         client = CuratorFrameworkFactory.newClient("192.168.31.130:2181", retryPolicy);
    28                         client.start();
    29                         mutex = new InterProcessMutex(client, "/curator/lock");
    30                         try {
    31                             mutex.acquire();
    32                             System.out.println(Thread.currentThread().toString() + " 获取到了锁");
    33                         } catch (Exception e) {
    34                             e.printStackTrace();
    35                         }
    36                     } finally {
    37                         System.out.println(Thread.currentThread().toString() + " 释放了锁");
    38                         try {
    39                             mutex.release();
    40                             client.close();
    41                         } catch (Exception e) {
    42                             e.printStackTrace();
    43                         }
    44                     }
    45                 }
    46             });
    47         }
    48         excutor.shutdown();
    49     }
    50 }
    View Code

    3、测试结果

    发现不会如线程抢占式那样交替执行,而是顺序执行。只有获取锁的线程才会执行,其他线程等待。

    集群负载均衡

      概念这里不多讲,直接看代码。

    1、服务提供者

     1 package com.zk.colony;
     2 
     3 import java.util.concurrent.TimeUnit;
     4 
     5 import org.I0Itec.zkclient.ZkClient;
     6 import org.I0Itec.zkclient.ZkConnection;
     7 import org.apache.zookeeper.CreateMode;
     8 
     9 /**
    10    *      服务提供者
    11  * @author Zomi
    12  *
    13  */
    14 public class ServiceProvider {
    15     static String ZOOKEEPER_STR = "192.168.31.130:2181";
    16     static String NODE_PATH = "/service";
    17     static String SERVICE_NAME = "/myService";
    18     
    19     private ZkClient zkClient;
    20     
    21     public ServiceProvider() {
    22         zkClient = new ZkClient(new ZkConnection(ZOOKEEPER_STR));
    23         System.out.println("provider sucess connected to zookeeper server!");
    24         if(!zkClient.exists(NODE_PATH)) {
    25             zkClient.create(NODE_PATH, "my test service", CreateMode.PERSISTENT);
    26         }
    27     }
    28     
    29     public void registryService(String serviceIp, Object obj) {
    30         if(!zkClient.exists(NODE_PATH + SERVICE_NAME)) {            
    31             zkClient.create(NODE_PATH + SERVICE_NAME, "provider services list", CreateMode.PERSISTENT);        
    32         }
    33         //对本机服务进行注册
    34         zkClient.createEphemeral(NODE_PATH + SERVICE_NAME + "/" + serviceIp, obj);
    35         System.out.println("注册成功![" + serviceIp + "]");
    36     }
    37     /**
    38      * 服务启动、注册服务
    39      * @param args
    40      * @throws InterruptedException 
    41      */
    42     public static void main(String[] args) throws InterruptedException {
    43         System.out.println("服务器开始运行了.......");
    44         ServiceProvider provider = new ServiceProvider();
    45         provider.registryService("1.1.1.1", "1.1.1.1 data");
    46         TimeUnit.SECONDS.sleep(20);
    47         provider.registryService("2.2.2.2", "2.2.2.2 data");
    48         TimeUnit.SECONDS.sleep(20);
    49         provider.registryService("3.3.3.3", "3.3.3.3 data");
    50         System.out.println("服务器异常宕机了.......");
    51     }
    52 }
    View Code

    2、服务消费者

     1 package com.zk.colony;
     2 
     3 import java.util.ArrayList;
     4 import java.util.List;
     5 import java.util.Random;
     6 import java.util.concurrent.TimeUnit;
     7 
     8 import org.I0Itec.zkclient.IZkChildListener;
     9 import org.I0Itec.zkclient.ZkClient;
    10 import org.I0Itec.zkclient.ZkConnection;
    11 import org.apache.zookeeper.CreateMode;
    12 
    13 /**
    14  * 消费者,通过某种负载均衡算法选择一个提供者进行消费
    15  * @author Zomi
    16  *
    17  */
    18 public class ServiceConsumer {
    19     static String ZOOKEEPER_STR = "192.168.31.130:2181";
    20     static String NODE_PATH = "/service";
    21     static String SERVICE_NAME = "/myService";
    22     
    23     private ZkClient zkClient;
    24     
    25     private List<String> serviceList = new ArrayList<String>();
    26     
    27     public ServiceConsumer() {
    28         zkClient = new ZkClient(new ZkConnection(ZOOKEEPER_STR));
    29         System.out.println("consumer sucess connected to zookeeper server!");
    30         if(!zkClient.exists(NODE_PATH)) {
    31             zkClient.create(NODE_PATH, "my test service", CreateMode.PERSISTENT);
    32         }
    33     }
    34     
    35     /**
    36      * 订阅服务
    37      */
    38     public void subscribeSerivce() {
    39         serviceList = zkClient.getChildren(NODE_PATH + SERVICE_NAME);//下载注册列表
    40         zkClient.subscribeChildChanges(NODE_PATH + SERVICE_NAME, new IZkChildListener() {
    41             @Override
    42             public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
    43                 //注意:此处没有考虑空
    44                 if(serviceList.size() != currentChilds.size() || !serviceList.containsAll(currentChilds)) {
    45                     System.out.println("服务列表变化了,我要更新了:" + currentChilds);
    46                     serviceList = currentChilds;
    47                 }
    48             }
    49         });
    50     }
    51     /**
    52      * 模拟调用服务
    53      */
    54     public void consume() {
    55         if(serviceList!=null && serviceList.size()>0) {
    56             //这里使用随机访问某台服务
    57             int index = new Random().nextInt(serviceList.size());
    58             System.out.println("调用[" + NODE_PATH + SERVICE_NAME + "]服务:" + serviceList.get(index));
    59         }else {
    60             System.out.println("没有服务提供者");
    61         }
    62     }
    63     
    64     /**
    65      * 客户端调用启动
    66      * @param args
    67      */
    68     public static void main(String[] args) {
    69         ServiceConsumer consumer = new ServiceConsumer();
    70         new Thread() {
    71             public void run() {
    72                 while(true) {
    73                     try {
    74                         consumer.subscribeSerivce();
    75                         TimeUnit.SECONDS.sleep(5);
    76                         consumer.consume();
    77                     } catch (InterruptedException e) {
    78                         e.printStackTrace();
    79                     }
    80                 }
    81             };
    82         }.start();
    83         
    84     }
    85 }
    View Code

    3、先启动消费者,再启动服务者。运行结果如下:

          

    服务高可用

    原理很简单,都是基于zk临时节点的特性,及监听机制。下面的图,一看就懂:

    备服务监听到主服务挂掉了,立马当先,对外提供服务。

     分布式协调

    库存、订单的例子,保证库存先扣减成功,再订单处理成功(然后才能通知快递员过来揽件之类的,否则没有货订单却创建成功了,快递小哥就白跑了,整个工作就显得很不协调了)。

    "我们所要追求的,永远不是绝对的正确,而是比过去的自己更好"
  • 相关阅读:
    玲珑学院-ACM比赛1014
    扩展欧几里得算法
    中国剩余定理(孙子定理)及实现----原理详解
    搞懂树状数组
    HDU3792---Twin Prime Conjecture(树状数组)
    树状数组 模板
    HDU1541--Stars(树状数组)
    HDU4046--Panda(树状数组)
    CCF-201604-1-折点计数
    CCF-201604-2-俄罗斯方块
  • 原文地址:https://www.cnblogs.com/zomicc/p/12488758.html
Copyright © 2011-2022 走看看