zoukankan      html  css  js  c++  java
  • 五大分布式场景解决方案

    一、一致性Hash算法

    Hash算法,散列函数,顾名思义,它是一个函数。如果把它定义成 hash(key) ,其中 key 表示元素的键值,则 hash(key) 的值表示经过散列函数计算得到的散列值。

    常见的Hash算法如:MD5、SHA-1

    Hash算法在分布式场景中的应用,主要分为两类:

    1. 请求的负载均衡

    如nginx的ip_hash策略,可以让同一个客户端,每次都路由到同一个目标服务器,可以实现会话粘滞,避免处理session共享问题。

    具体步骤就是使用hash算法根据ip计算hash值,然后根据目标服务器数量取模。
    2. 分布式存储

    以分布式存储redis集群为例,有redis1、redis2、redis3三台服务器,那么对某一个key进行存储呢?就需要针对key进行hash处理,index=hash(key)%3,使用index锁定存储服务器的具体节点。

    普通Hash算法存在的问题

    普通的Hash算法存在一个问题,以ip_hash为例,如果说后台有3台tomcat,tomcat2宕机了,那么hash(ip)%3变成了hash(ip)%2,这样就造成了所有的用户都需要重新计算。原本路由到tomcat1和tomcat3的那部分ip用户也会受影响。

    一致性Hash算法

    首先我们想象有一个环,这个环起始点是0,结束点是2的32次方-1。这样我们把服务器的ip求hash取得一个值,就能对应到环上某一个位置。针对客户端用户也是一样,根据客户端ip进行hash求值,也能对应到环上某一个位置,然后如何确定一个客户端路由到哪个服务器呢?

    就按照顺时针放行找到最近的服务器节点。

    如果还是上面的场景,3台tomcat应用,tomcat2宕机了。那么原来tomcat1和tomcat3的用户不会受影响,而原本应该落到tomat2上的应用会全部落到tomcat1或者tomcat3上。

    那这个算法就没有问题了吗?

    如果服务端节点比较少,如上图所示,那么就会出现数据倾斜问题,大量的请求会路由到节点1,只有少部分能路由到节点2.

    为了解决这个问题,一致性hash算法引入了虚拟节点机制。可以对每个服务器节点计算多个hash。具体做法可以在每个服务器ip或主机名后面增加编号来实现。

    简易的一致性hash算法代码如下(仅供学习使用,不能用于生产):

    /**
     * ⼀致性Hash算法实现(含虚拟节点)
     */
    public class ConsistentHashWithVirtual {
    
        public static void main(String[] args) {
            String[] clients=new String[]{"10.177.2.1","10.192.2.1","10.98.45.4"};
    
            String[] tomcatServers = new String[]
                    {"123.111.0.0","123.101.3.1","111.20.35.2","123.98.26.3"};
    
            //虚拟节点数
            int virtualCount=20;
    
            TreeMap<Integer,String> serverMap=new TreeMap<>();
            for (String server:tomcatServers){
                int serverHash = Math.abs(server.hashCode());
                serverMap.put(serverHash,server);
                for (int i = 0; i < virtualCount; i++) {
                    int virtualHash=Math.abs((server+"#"+virtualCount).hashCode());
                    serverMap.put(virtualHash,"虚拟"+server);
                }
    
            }
    
    
            for (String client:clients){
                int clientHash = Math.abs(client.hashCode());
                //获取一个子集。其所有对象的 key 的值大于等于 fromKey
                SortedMap<Integer, String> sortedMap =serverMap.tailMap(clientHash);
                if(sortedMap.isEmpty()){
                    Integer firstKey = serverMap.firstKey();
                    String server = serverMap.get(firstKey);
                    System.out.println("客户端:"+client+" 路由到:"+server);
                }else {
                    Integer firstKey = sortedMap.firstKey();
                    String server = sortedMap.get(firstKey);
                    System.out.println("客户端:"+client+" 路由到:"+server);
                }
            }
        }
    }
    

    二、集群时钟同步

    集群时钟不同步,指的是集群里各个服务器的时间不一致。因为系统时钟不一致,数据就会混乱。

    集群时钟同步思路:

    1. 服务器都能联网的情况

    使用linux的定时任务,每隔一段时间执行一次ntpdate命令同步时间。

    #使⽤ ntpdate ⽹络时间同步命令
    ntpdate -u ntp.api.bz #从⼀个时间服务器同步时间
    

    如果ntpdate 命令不存在,可以用如下命令安装

    yum install -y ntp
    
    1. 选取集群中的一个服务器节点A作为时间服务器(如果这台服务器能够访问互联网,可以让这台服务器和网络时间保持同步,如果不能就手动设置一个时间。)

      2.1 设置好服务器A的时间
      2.2 把服务器A配置为时间服务器(修改/etc/ntp.conf文件)

      1、如果有 restrict default ignore,注释掉它
      2、添加如下⼏⾏内容
      
      restrict 172.17.0.0 mask 255.255.255.0 nomodify notrap # 放开局
      域⽹同步功能,172.17.0.0是你的局域⽹⽹段
      server 127.127.1.0 # local clock
      fudge 127.127.1.0 stratum 10   
      
      
      3、重启⽣效并配置ntpd服务开机⾃启动
      service ntpd restart
      chkconfig ntpd on
      

      2.2 集群中其他节点就可以从A服务器同步时间了

      ntpdate 172.17.0.17
      

    三、分布式ID

    1. 数据库方式
    2. redis方式
    3. UUID方式
    4. 雪花算法

    雪花算法代码:

    /**
     * 官方推出,Scala编程语言来实现的
     * Java前辈用Java语言实现了雪花算法
     */
    public class IdWorker{
    
        //下面两个每个5位,加起来就是10位的工作机器id
        private long workerId;    //工作id
        private long datacenterId;   //数据id
        //12位的序列号
        private long sequence;
    
        public IdWorker(long workerId, long datacenterId, long sequence){
            // sanity check for workerId
            if (workerId > maxWorkerId || workerId < 0) {
                throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0",maxWorkerId));
            }
            if (datacenterId > maxDatacenterId || datacenterId < 0) {
                throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0",maxDatacenterId));
            }
            System.out.printf("worker starting. timestamp left shift %d, datacenter id bits %d, worker id bits %d, sequence bits %d, workerid %d",
                    timestampLeftShift, datacenterIdBits, workerIdBits, sequenceBits, workerId);
    
            this.workerId = workerId;
            this.datacenterId = datacenterId;
            this.sequence = sequence;
        }
    
        //初始时间戳
        private long twepoch = 1288834974657L;
    
        //长度为5位
        private long workerIdBits = 5L;
        private long datacenterIdBits = 5L;
        //最大值
        private long maxWorkerId = -1L ^ (-1L << workerIdBits);
        private long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
        //序列号id长度
        private long sequenceBits = 12L;
        //序列号最大值
        private long sequenceMask = -1L ^ (-1L << sequenceBits);
        
        //工作id需要左移的位数,12位
        private long workerIdShift = sequenceBits;
       //数据id需要左移位数 12+5=17位
        private long datacenterIdShift = sequenceBits + workerIdBits;
        //时间戳需要左移位数 12+5+5=22位
        private long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
        
        //上次时间戳,初始值为负数
        private long lastTimestamp = -1L;
    
        public long getWorkerId(){
            return workerId;
        }
    
        public long getDatacenterId(){
            return datacenterId;
        }
    
        public long getTimestamp(){
            return System.currentTimeMillis();
        }
    
         //下一个ID生成算法
        public synchronized long nextId() {
            long timestamp = timeGen();
    
            //获取当前时间戳如果小于上次时间戳,则表示时间戳获取出现异常
            if (timestamp < lastTimestamp) {
                System.err.printf("clock is moving backwards.  Rejecting requests until %d.", lastTimestamp);
                throw new RuntimeException(String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds",
                        lastTimestamp - timestamp));
            }
    
            //获取当前时间戳如果等于上次时间戳
            //说明:还处在同一毫秒内,则在序列号加1;否则序列号赋值为0,从0开始。
            if (lastTimestamp == timestamp) {  // 0  - 4095
                sequence = (sequence + 1) & sequenceMask;
                if (sequence == 0) {
                    timestamp = tilNextMillis(lastTimestamp);
                }
            } else {
                sequence = 0;
            }
            
            //将上次时间戳值刷新
            lastTimestamp = timestamp;
    
            /**
              * 返回结果:
              * (timestamp - twepoch) << timestampLeftShift) 表示将时间戳减去初始时间戳,再左移相应位数
              * (datacenterId << datacenterIdShift) 表示将数据id左移相应位数
              * (workerId << workerIdShift) 表示将工作id左移相应位数
              * | 是按位或运算符,例如:x | y,只有当x,y都为0的时候结果才为0,其它情况结果都为1。
              * 因为个部分只有相应位上的值有意义,其它位上都是0,所以将各部分的值进行 | 运算就能得到最终拼接好的id
            */
            return ((timestamp - twepoch) << timestampLeftShift) |
                    (datacenterId << datacenterIdShift) |
                    (workerId << workerIdShift) |
                    sequence;
        }
    
        //获取时间戳,并与上次时间戳比较
        private long tilNextMillis(long lastTimestamp) {
            long timestamp = timeGen();
            while (timestamp <= lastTimestamp) {
                timestamp = timeGen();
            }
            return timestamp;
        }
    
        //获取系统时间戳
        private long timeGen(){
            return System.currentTimeMillis();
        }
    
    
    
    
        public static void main(String[] args) {
            IdWorker worker = new IdWorker(21,10,0);
            for (int i = 0; i < 100; i++) {
                System.out.println(worker.nextId());
            }
        }
    
    }
    

    关于分布式ID的更多详细内容,可以看我的另一篇博客:分布式主键

    四、分布式调度

    调度,也就是我们所说的定时任务。定时任务的使用场景很多,如订单超时取消,定时备份数据等等。

    那么分布式调度是什么意思呢?

    有两层含义:

    1. 运行在分布式集群环境下的调度任务(同一个定时任务程序部署多份,只应该有一个定时任务在执行)
    2. 同一个大的定时任务可以拆分为多个小任务在多个机器上同时执行

    在介绍分布式调度框架之前,我们先来回顾一下普通的定时任务框架Quartz.

    任务调度框架Quartz回顾

    1. 引入pom依赖
     <dependency>
        <groupId>org.quartz-scheduler</groupId>
        <artifactId>quartz</artifactId>
        <version>2.3.2</version>
    </dependency>
    
    1. 代码编写
    public class QuartzMain {
    
        public static void main(String[] args) throws SchedulerException {
            Scheduler scheduler = createScheduler();
            JobDetail job = createJob();
            Trigger trigger = createTrigger();
            scheduler.scheduleJob(job,trigger);
            scheduler.start();
        }
    
        /**
         * 创建任务调度器
         */
        public static Scheduler createScheduler() throws SchedulerException {
            SchedulerFactory schedulerFactory=new StdSchedulerFactory();
            return schedulerFactory.getScheduler();
        }
    
    
        /**
         * 创建一个任务
         * @return
         */
        public static JobDetail createJob(){
            JobBuilder jobBuilder=JobBuilder.newJob(DemoJob.class);
            jobBuilder.withIdentity("jobName","myJob");
            return jobBuilder.build();
        }
    
        /**
         * 创建一个作业任务时间触发器
         * @return
         */
        public static Trigger createTrigger(){
            CronTrigger trigger=TriggerBuilder.newTrigger()
                    .withIdentity("triggerName","myTrigger")
                    .startNow()
                    .withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?")).build();
            return trigger;
        }
    }
    
    
    public class DemoJob implements Job {
    
        @Override
        public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
            System.out.println("任务执行");
            System.out.println(new Date().toLocaleString());
            System.out.println(Thread.currentThread().getName());
            try {
                Thread.sleep(30000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    

    分布式调度框架Elastic-Job

    Elastic-Job介绍

    Elastic-Job是当当网开源的一个分布式调度解决方案,是基于Quartz二次开发的,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。我们要学习的是 Elastic-Job-Lite,它定位为轻量级⽆中⼼
    化解决⽅案,使⽤Jar包的形式提供分布式任务的协调服务,⽽Elastic-Job-Cloud⼦项⽬需要结合Mesos以及Docker在云环境下使⽤

    Elastic-Job的github地址:https://github.com/elasticjob

    主要功能介绍:

    • 分布式调度协调

    在分布式环境中,任务能够按照指定的调度策略执行,并且能够避免同一个任务多实例的重复执行。

    • 丰富的调度策略
    • 弹性扩容缩容

    当集群中增加一个实例,塔应当也能够被选举并执行任务;当集群中减少一个实例时,它所执行的任务能被转移到别的实例来执行。

    • 失效转移

    某实例在任务执行失败后,会被转移到其他实例执行

    • 错过执行作业重触发

    若因某种原因导致作业错过执行,自动记录错过执行的作业,并在上次作业完成后自动触发。

    • 支持并行调度

    支持任务分片,任务分片是指将一个任务分为多个小任务在多个实例中执行

    • 作业分片一致性

    当任务被分片后,保证同一分片在分布式环境中仅一个执行实例

    Elastic-Job-Lite应用

    Elastic-Job依赖于Zookeeper进行分布式协调,需要安装Zookeeper软件(3.4.6版本以上)。

    引入pom

     <dependency>
        <groupId>com.dangdang</groupId>
        <artifactId>elastic-job-lite-core</artifactId>
        <version>2.1.5</version>
    </dependency>
    
    1. 只有一个分片任务的场景
    public class ElasticJobMain {
    
    
    
        public static void main(String[] args) throws SQLException {
            ZookeeperConfiguration zookeeperConfiguration=new ZookeeperConfiguration("localhost:2181","data-archive-job");
            CoordinatorRegistryCenter coordinatorRegistryCenter=new ZookeeperRegistryCenter(zookeeperConfiguration);
            coordinatorRegistryCenter.init();
            //shardingTotalCount设置为1时,启动多个实例,只能有一个实例执行
            JobCoreConfiguration jobCoreConfiguration=JobCoreConfiguration.newBuilder("jobName",
                    "0/2 * * * * ?",1).build();
            SimpleJobConfiguration simpleJobConfiguration=new SimpleJobConfiguration(jobCoreConfiguration,BackupJob.class.getName());
            JobScheduler jobScheduler = new JobScheduler(coordinatorRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).build());
            jobScheduler.init();
    
        }
    
    
    }
    
    public class BackupJob implements SimpleJob {
    
        @Override
        public void execute(ShardingContext shardingContext) {
            String sql="select * from t_order  limit 1";
            try {
                List<Map<String, Object>> list = JdbcUtils.executeQuery(InitData.dataSource, sql);
                if(!list.isEmpty()){
                    Map<String, Object> objectMap = list.get(0);
    
                    System.out.println(objectMap);
                    String insertSql="insert into t_order_bak (id,code,amt,create_time,user_id) values(?,?,?,?,?)";
                    Collection<Object> values = objectMap.values();
                    List<Object> params=new ArrayList<>();
                    params.addAll(values);
                    JdbcUtils.execute(InitData.dataSource,insertSql,params);
    
                    //删除原来的
                    String deleteSql="delete from t_order where id=?";
                    Object id = objectMap.get("id");
                    JdbcUtils.execute(InitData.dataSource,deleteSql, Arrays.asList(id));
                    System.out.println("数据:"+id+"备份完成");
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
    
    1. 任务分片的情况
    public class ElasticJobMain {
    
    
    
        public static void main(String[] args) throws SQLException {
            ZookeeperConfiguration zookeeperConfiguration=new ZookeeperConfiguration("localhost:2181","data-archive-job");
            CoordinatorRegistryCenter coordinatorRegistryCenter=new ZookeeperRegistryCenter(zookeeperConfiguration);
            coordinatorRegistryCenter.init();
            //shardingTotalCount设置为3,shardingItemParameters为传入的分片参数,0=后面的值就是0分片将会取到的参数。如0=abc,那么0分片
            //对应shardingContext.getShardingParameter()取到的就是abc
            JobCoreConfiguration jobCoreConfiguration=JobCoreConfiguration.newBuilder("jobName2",
                    "0/2 * * * * ?",3).shardingItemParameters("0=0,1=1,2=2")
                    .build();
            SimpleJobConfiguration simpleJobConfiguration=new SimpleJobConfiguration(jobCoreConfiguration,BackupJob.class.getName());
            JobScheduler jobScheduler = new JobScheduler(coordinatorRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).build());
            jobScheduler.init();
    
        }
    
    
    }
    
    public class BackupJob implements SimpleJob {
    
        @Override
        public void execute(ShardingContext shardingContext) {
    
            int shardingItem = shardingContext.getShardingItem();
            System.out.println("当前分片:"+shardingItem);
            String shardingParameter = shardingContext.getShardingParameter();
            System.out.println("获取分片参数:"+shardingParameter);
            String sql="select * from t_order where user_id = ? limit 1";
            try {
                List<Map<String, Object>> list = JdbcUtils.executeQuery(InitData.dataSource, sql,shardingParameter);
                if(!list.isEmpty()){
                    Map<String, Object> objectMap = list.get(0);
    
                    System.out.println(objectMap);
                    String insertSql="insert into t_order_bak (id,code,amt,create_time,user_id) values(?,?,?,?,?)";
                    Collection<Object> values = objectMap.values();
                    List<Object> params=new ArrayList<>();
                    params.addAll(values);
                    JdbcUtils.execute(InitData.dataSource,insertSql,params);
    
                    //删除原来的
                    String deleteSql="delete from t_order where id=?";
                    Object id = objectMap.get("id");
                    JdbcUtils.execute(InitData.dataSource,deleteSql, Arrays.asList(id));
                    System.out.println("数据:"+id+"备份完成");
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
    

    五、Session共享

    Session问题原因分析

    出现这个问题的原因,从根本上来说是因为HTTP协议是无状态的协议。客户端和服务端在某次会话中产生的数据不会被保留下来,所以第二次请求服务端无法认识到你曾经来过。后来出现了两种用于保持Http状态的技术,就是Cookie和Session。

    当集群中有多台服务器时,你在服务器1上登录了,服务器1的session里有了你的数据,下一次请求如果nginx把你路由到其他服务器,那你又需要登录了,因为其他服务器上没有存得有你的数据。

    解决Session一致性方案

    1. 方案一:Nginx的ip_hash策略

    同一个客户端ip的请求都会被路由到同一个目标服务器

    • 优点:配置简单,不入侵应用
    • 缺点:服务器重启Session丢失,单点故障问题
    1. 方案二:Session复制(不推荐)

    多个Tomcat之间通过修改配置文件,达到Session之间的复制

    • 优点:不入侵应用,便于扩展,服务器重启不会造成Session丢失
    • 缺点:性能低,内存消耗,延迟性
    1. 方案三:Session集中存储(推荐)
    • 优点:能适应各种负载均衡策略,服务器重启不会造成Session丢失
    • 缺点:对应用有入侵,引入了和Redis的交互代码

    Redis Session共享

    Spring Session使得基于Redis的Session共享非常简单。

    1. 引入依赖
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.session</groupId>
        <artifactId>spring-session-data-redis</artifactId>
    </dependency>
    
    1. 配置redis
    spring.redis.database=0
    spring.redis.host=127.0.0.1
    spring.redis.port=6379
    
    1. 添加注解

    在启动类上增加@EnableRedisHttpSession

    书山有路勤为径,学海无涯苦作舟
  • 相关阅读:
    DNT论坛整合笔记二
    LINQ中的动态排序
    无法安装数据库关系图支持对象的解决方法
    总访问量,日访问量,周访问量统计代码
    ASP.NET 数据绑定控件和 Eval方法
    KindEditor ASP.NET 上传/浏览服务器 附源码
    地图定位 图吧地图定位 附javascript源码每行都有注释
    java.io.IOException: Unable to open sync connection!
    Canvas和Paint实例
    Android初级教程_获取Android控件的宽和高
  • 原文地址:https://www.cnblogs.com/javammc/p/15757983.html
Copyright © 2011-2022 走看看