zoukankan      html  css  js  c++  java
  • 基于Hazelcast及Kafka实现的分布式锁与集群负载均衡

    基于Hazelcast及Kafka实现的分布式锁与集群负载均衡

    之所以产出这一博客,是因为公司的项目上线了集群之后出现了问题。

    大佬排查之后,发现我写的代码存在一点问题,所以就趁周末时间,进行了修改与测试,产出了这一Demo。

    一、分布式锁

    参考文章

    先来看下流程图

    准备三台节点,每台节点上面都有相同的定时任务,将三台节点部署成一个集群,定时任务同时启动,经过分布式锁的过滤,每个任务只有拿到锁的那台机器进行执行。

    HazelcastConfig.java

    @Configuration
    public class HazelcastConfig {
        @Bean
        public HazelcastInstance hazelcastInstance() {
            return Hazelcast.newHazelcastInstance();
        }
    }
    

    TaskExecutorConfig.java

    @Configuration
    @EnableAsync
    public class TaskExecutorConfig {
        @Bean("taskExecutor")
        public TaskExecutor taskExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            //核心线程数
            executor.setCorePoolSize(10);
            //最大线程数
            executor.setMaxPoolSize(20);
            //队列的长度
            executor.setQueueCapacity(8);
            //线程池维护线程所允许的空闲时间
            executor.setKeepAliveSeconds(60);
            //线程是对拒绝任务的处理策略,也就是没有线程可用的时候
            //CallerRunsPolicy在任务被拒绝添加后,会在调用execute方法的的线程来执行被拒绝的任务。除非executor被关闭,否则任务不会被丢弃。
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            //任务执行器的前缀,打印日志时输出
            executor.setThreadNamePrefix("task-thread-");
            return executor;
        }
    }
    

    TaskEnum.java

    public enum TaskEnum {
        FIRST(0, "一级任务"),
        SECOND(1, "二级任务"),
        THIRD(2, "三级任务"),
        FORTH(3, "四级任务");
        public final int code;
        public final String desc;
    
        TaskEnum(int code, String desc) {
            this.code = code;
            this.desc = desc;
        }
    }
    

    DailyTask.java

    @Component
    public class DailyTask {
        Logger log = LoggerFactory.getLogger(DailyTask.class);
        @Autowired
        HazelcastInstance hazelcastInstance;
    
        private final static String API_MONITOR_TASK_MAP_NAME = "api_monitor_task_map";
    
        @Async("taskExecutor")
        @Scheduled(cron = "0 10 14 * * ? ")
        public void apiQhScheduledTask() {
            doTask(TaskEnum.FIRST);
        }
    
    
        @Async("taskExecutor")
        @Scheduled(cron = "0 10 14 * * ?")
        public void apiSHScheduledTask() {
            doTask(TaskEnum.SECOND);
        }
    
        @Async("taskExecutor")
        @Scheduled(cron = "0 10 14 * * ?")
        public void apiTHScheduledTask() {
            doTask(TaskEnum.THIRD);
        }
    
        @Async("taskExecutor")
        @Scheduled(cron = "0 10 14 * * ?")
        public void apiODScheduledTask() {
            doTask(TaskEnum.FORTH);
        }
    
    
        public void doTask(TaskEnum task) {
            if (!ObjectUtils.isEmpty(task)) {
                IMap<Integer, String> map = hazelcastInstance.getMap(API_MONITOR_TASK_MAP_NAME);
                map.put(task.code, task.desc);
                //判断任务是否能锁
                boolean canLocked = map.tryLock(task.code);
                if(canLocked){
                    System.out.println("本次抢到锁,执行任务...");
                    log.info(task.desc);
                    map.unlock(task.code);
                }else{
                    System.out.println("本次不抢锁!");
                }
            }
        }
    }
    

    最终执行结果

    缺点:同一时刻的任务,有可能全部被同一台机器抢到,其他两台机器会空闲,这种极限情况下的分配存在问题。不过我目前的项目同时只有一条任务,够用。

    二、负载均衡

    参考文章

    先看下流程图

    我是准备了两台机器提供Kafka集群。具体配置过程

    1. 两台机器均启动zookeeper,保持zookeeper默认配置即可
    2. 配置Kafka的配置文件。
      • 每台kafka的brokerId保持唯一。
      • 每台kafka的zookeeper.connect配置为zookeeper集群。
      • 每台kafka的advertised.listeners配置PLAINTEXT://当前节点ip:9092,好像不用配也可以。

    通过Kafka实现的负载均衡可以解决上面的那个问题,哪怕所有锁都被他自己抢到了,也无所谓,也就是抢到锁的节点只需要将工作内容抛给Kafka,经过Kafka,然后均衡地分配给下面的各个节点进行消费,从而达到负载均衡。

    运行结果

    三、存在的问题

    demo里,上面两个例子,就已经很好很好了。达到了我满意并想要的效果。

    实际上还是存在问题的。

    在大型项目中,比如公司里的项目,就出现了问题。多节点启动之后,虽然定时任务都是同时的,但是由于机器性能的差异,导致会有延迟。

    类似于下图。

    还有一个问题,没有用try..finally释放锁,出现了问题。大概类似于上面的问题,没排查出原因来,最后处于妥善,还是用finally吧。

    如上图所示,不加finally来释放锁时,他会等待别人释放锁,再去拿锁,这都给我整蒙了。

    最后完善版的版本。

    public void doTask(TaskEnum task) {
        if (!ObjectUtils.isEmpty(task)) {
            IMap<Integer, Long> map = hazelcastInstance.getMap(API_MONITOR_TASK_MAP_NAME);
            Long temp = map.get(task.code);
            Long lastExecTime = ObjectUtils.isEmpty(temp) ? 0L : temp;
            Long currentTime = System.currentTimeMillis();
            Long intervalTime = currentTime - lastExecTime;
            log.info("【" + task.desc + "】,lastExecTime=" + lastExecTime + ",currentTime=" + currentTime + ",intervalTime=" + intervalTime);
            //判断任务是否能锁
            boolean canLocked = map.tryLock(task.code);
            if (intervalTime > DISTRIBUTE_TASK_TIME_INTERVAL && canLocked) {
                log.info("抢到【" + task.desc + "】分发权限!");
                try {
                    map.put(task.code, currentTime);
                    doMonitorTask(task);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    map.unlock(task.code);
                    log.info("释放【" + task.desc + "】");
                }
            } else {
                log.info("没有抢到【" + task.desc + "】权限");
            }
        }
    }
    
  • 相关阅读:
    xhEditor struts2实现图片上传
    xhEditor入门基础
    jQuery全屏插件Textarea Fullscreen
    jQuery幻灯片插件Skippr
    jQuery跳房子插件hopscotch
    合理配置SQLSERVER内存
    浅谈SQL Server 对于内存的管理
    SQL Server 临时表和表变量系列之选择篇
    SQLTest系列之INSERT语句测试
    转:表变量与临时表的优缺点
  • 原文地址:https://www.cnblogs.com/meethigher/p/15023575.html
Copyright © 2011-2022 走看看