zoukankan      html  css  js  c++  java
  • springboot整合zookeeper实现分布式锁

    01 安装并允许zookeeper

    1. 安装jdk
    2. 官网下载zookeeper的压缩包,我这里下载的是3.4.10版本
    3. 解压后进入到zookeeper-3.4.10/conf,修改zoo_sample.cfg文件修改为zoo.cfg文件
    mv zoo_sample.cfg zoo.cfg
    
    • 1
    1. 打开zoo.cfg文件,修改dataDir路径。修改后在/usr/local/zookeeper-3.4.10目录创建文件夹mkdir zkData
    dataDir=/usr/local/zookeeper-3.4.10/zkData
    
    • 1
    1. 启动zookeeper
    /usr/local/zookeeper-3.4.10/bin/zkServer.sh start
    
    • 1

    02 springboot应用配置CuratorFramework

    1. 导入maven依赖
    <!-- zookeeper 客户端 -->
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>2.12.0</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>2.12.0</version>
    </dependency>
    
    1. 配置CuratorFramework

    zookeeper的默认端口是2181

    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.RetryNTimes;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.data.jpa.repository.config.EnableJpaAuditing;
    import org.springframework.scheduling.annotation.EnableScheduling;
    
    @EnableScheduling
    @EnableJpaAuditing
    @SpringBootApplication
    public class MyDemoApplication {
    
    	public static void main(String[] args) {
    		SpringApplication.run(MyDemoApplication.class, args);
    	}
    
    	@Bean
    	public CuratorFramework curatorFramework() {
    		return CuratorFrameworkFactory.newClient("127.0.0.1:2181", new RetryNTimes(5, 1000));
    	}
    }
    
    1. 启动CuratorFramework客户端
    import org.apache.curator.framework.CuratorFramework;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.ApplicationArguments;
    import org.springframework.boot.ApplicationRunner;
    import org.springframework.stereotype.Service;
    
    /**
     * 实现了ApplicationRunner接口后,当容器启动后,会执行实现的run方法
     *
     * @author 594781919
     */
    @Service
    public class StartService implements ApplicationRunner {
    
        @Autowired
        private CuratorFramework curatorFramework;
    
        @Autowired
        private ListenerService listenerService;
    
        @Override
        public void run(ApplicationArguments applicationArguments) {
    		// 非常重要!!!Start the client. Most mutator methods will not work until the client is started
            curatorFramework.start();
            System.out.println("zookeeper client start");
            // 初始化监听方法
            listenerService.listener();
        }
    }
    

    03 使用zookeeper实现集群只一个应用实例执行定时任务

    当我们启动多个实例时,需要其中一个实例执行定时任务,其它实例不执行。

    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.recipes.leader.LeaderLatch;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Service;
    
    import java.io.IOException;
    import java.util.Date;
    
    /**
     * 实现多个应用实例只一个执行定时任务
     *
     * @author 594781919@qq.com
     */
    @Service
    public class TimerTaskService {
    
        @Autowired
        private CuratorFramework curatorFramework;
    
        @Value("${server.port}")
        private String port;
    
        @Scheduled(cron = "0/5 * * * * *")
        public void task() {
            LeaderLatch leaderLatch = new LeaderLatch(curatorFramework, "/timerTask");
            try {
                leaderLatch.start();
                // Leader选举需要一些时间,等待2秒
                Thread.sleep(2000);
                // 判断是否为主节点
                if (leaderLatch.hasLeadership()) {
                    System.out.println(new Date() + "    port=" + port + " 是领导");
                    // 定时任务的业务逻辑代码
                } else {
                    System.out.println(new Date() + "    port=" + port + " 是从属");
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    leaderLatch.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    04 使用zookeeper实现分布式锁

    import com.igola.domain.Employee;
    import com.igola.repository.EmployeeRepository;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * @author 594781919@qq.com
     */
    @RestController
    public class EmployeeController {
        @Autowired
        private EmployeeRepository employeeRepository;
    
        @Autowired
        private CuratorFramework curatorFramework;
    
        @GetMapping("/emp/save")
        public Employee save(String name) {
    
    		// 获取锁
            InterProcessSemaphoreMutex balanceLock = new InterProcessSemaphoreMutex(curatorFramework, "/zktest" + name);
            Employee employee = new Employee();
            try {
            	// 执行加锁操作
                balanceLock.acquire();
                System.out.println("已经加锁了, name=" + name);
                employee.setName(name);
                if ("abc".equals(name)) {
                    Thread.sleep(30000);
                }
                employee.setAge((int) (Math.random() * 100));
                employee.setSex(false);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                	// 释放锁资源
                    balanceLock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
            employeeRepository.save(employee);
    
            return employee;
        }
    }
    
    • 1
    • 2

    05 使用zookeeper实现调度任务

    当我们在启动多个服务后,访问了其中一个服务,执行了一些方法。然后我们需要其它服务也要执行这些方法,就需要用到NodeCache。

    比如我们把一些数据缓存到Map对象中,当需要更新这个Map对象的数据时,我们就可以用NodeCache将每个服务都更新自己的Map对象。

    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.recipes.cache.NodeCache;
    import org.apache.curator.utils.CloseableUtils;
    import org.apache.zookeeper.data.Stat;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.PreDestroy;
    import java.util.Date;
    
    /**
     * @author 594781919
     */
    @Service
    public class ListenerService {
        private final CuratorFramework curatorFramework;
        private NodeCache nodeCache;
    
        public static final String path = "/hello/world";
    
        public ListenerService(CuratorFramework curatorFramework) {
            this.curatorFramework = curatorFramework;
    
        }
    
        public void listener() {
            try {
                // 创建路径
                Stat stat = curatorFramework.checkExists().forPath(path);
                if (stat == null) {
                    curatorFramework.create().creatingParentsIfNeeded().forPath(path);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            nodeCache = new NodeCache(curatorFramework, path);
            // 添加监听的路径改变后需要执行的任务
            nodeCache.getListenable().addListener(this::run);
            try {
                nodeCache.start();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("开始监听......");
        }
    
        @PreDestroy
        public void preDestroy() {
            CloseableUtils.closeQuietly(nodeCache);
        }
    
        public void notifyNodeCache() {
            try {
                curatorFramework.setData().forPath(path);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    	// 需要执行的调度任务
        private void run() {
            System.out.println(new Date().toLocaleString() + ", 开始执行监听任务");
        }
    }
  • 相关阅读:
    推荐随笔
    搭建http服务
    python之numpy和pandas
    eclipse项目打包
    keras安装
    eclipse设置快速提示符
    linux常用命令
    Webpack3 从入门到放弃
    【ES6】Generator+Promise异步编程
    【Vue】删除数组元素,导致剩余元素被重新渲染
  • 原文地址:https://www.cnblogs.com/bruce1992/p/13889991.html
Copyright © 2011-2022 走看看