zoukankan      html  css  js  c++  java
  • 分布式缓存重建并发冲突和zookeeper分布式锁解决方案

    如果缓存服务在本地的ehcache中都读取不到数据。
    这个时候就意味着,需要重新到源头的服务中去拉去数据,拉取到数据之后,赶紧先给nginx的请求返回,同时将数据写入ehcache和redis中
    分布式重建缓存的并发冲突问题
    重建缓存:数据在所有的缓存中都不存在了(LRU算法弄掉了),就需要重新查询数据写入缓存,重建缓存
    分布式的重建缓存,在不同的机器上,不同的服务实例中,去做上面的事情,就会出现多个机器分布式重建去读取相同的数据,然后写入缓存中
    发生分布式重建缓存的并发冲突问题图片:
     
    1、流量均匀分布到所有缓存服务实例上

    应用层nginx,是将请求流量均匀地打到各个缓存服务实例中的,可能会部署多实例在不同的机器上

    2、应用层nginx的hash,固定商品id,走固定的缓存服务实例

    分发层的nginx的lua脚本,一堆应用层nginx的地址列表,对每个商品id做一个hash,然后对应用nginx数量取模

    将每个商品的请求固定分发到同一个应用层nginx上面去
    在应用层nginx里,发现自己本地lua shared dict缓存中没有数据的时候,就采取一样的方式,对product id取模,然后将请求固定分发到同一个缓存服务实例中去

    这样的话,就不会出现说多个缓存服务实例分布式的去更新那个缓存了
    3、源信息服务发送的变更消息,需要按照商品id去分区,固定的商品变更走固定的kafka分区,也就是固定的一个缓存服务实例获取到
    缓存服务,是监听kafka topic的,一个缓存服务实例,作为一个kafka consumer,就消费topic中的一个partition

    所以你有多个缓存服务实例的话,每个缓存服务实例就消费一个kafka partition

    所以这里,一般来说,你的源头信息服务,在发送消息到kafka topic的时候,都需要按照product id去分区

    也就时说,同一个product id变更的消息一定是到同一个kafka partition中去的,也就是说同一个product id的变更消息,一定是同一个缓存服务实例消费到的
     
    4、自己写的简易的hash分发,与kafka的分区,可能并不一致!!!
    自己写的简易的hash分发策略,是按照crc32去取hash值,然后再取模的
    不知道你的kafka producer的hash策略是什么,很可能说跟我们的策略是不一样的
    数据变更的消息所到的缓存服务实例,跟我们的应用层nginx分发到的那个缓存服务实例也许就不在一台机器上了

    这样的话,在高并发,极端的情况下,可能就会出现冲突
    分布式的缓存重建并发冲突问题:
    6、基于zookeeper分布式锁的解决方案
    分布式锁,如果你有多个机器在访问同一个共享资源,那么这个时候,如果你需要加个锁,让多个分布式的机器在访问共享资源的时候串行起来,多个不同机器上的服务共享的锁,就是分布式锁
    分布式锁当然有很多种不同的实现方案,redis分布式锁,zookeeper分布式锁

    zk,做分布式协调这一块,还是很流行的,大数据应用里面,hadoop,storm,都是基于zk去做分布式协调

    zk分布式锁的解决并发冲突的方案

    (1)变更缓存重建以及空缓存请求重建,更新redis之前,都需要先获取对应商品id的分布式锁
    (2)拿到分布式锁之后,需要根据时间版本去比较一下,如果自己的版本新于redis中的版本,那么就更新,否则就不更新
    (3)如果拿不到分布式锁,那么就等待,不断轮询等待,直到自己获取到分布式的锁
    zk分布式锁的原理
    通过去创建zk的一个临时node,来模拟给摸一个商品id加锁
    zk会给你保证说,只会创建一个临时node,其他请求过来如果再要创建临时node,就会报错,NodeExistsException
     
    我们的所谓上锁,其实就是去创建某个product id对应的一个临时node
    如果临时node创建成功了,那么说明我们成功加锁了,此时就可以去执行对redis立面数据的操作

    如果临时node创建失败了,说明有人已经在拿到锁了,在操作reids中的数据,那么就不断的等待,直到自己可以获取到锁为止
    基于zk client api,去封装上面的这个代码逻辑

    释放一个分布式锁,去删除掉那个临时node就可以了,就代表释放了一个锁,那么此时其他的机器就可以成功创建临时node,获取到锁
    pom:
    <dependency>
    		    <groupId>org.apache.zookeeper</groupId>
    		    <artifactId>zookeeper</artifactId>
    		    <version>3.4.5</version>
    		</dependency>
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.Watcher.Event.KeeperState;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.apache.zookeeper.ZooKeeper;
    
    
    public class ZooKeeperSession {
    	
    	private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
    	
    	private ZooKeeper zookeeper;
    
    	public ZooKeeperSession() {
    		// 去连接zookeeper server,创建会话的时候,是异步去进行的
    		// 所以要给一个监听器,说告诉我们什么时候才是真正完成了跟zk server的连接
    		try {
    			this.zookeeper = new ZooKeeper(
    					"192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181", 
    					50000, 
    					new ZooKeeperWatcher());
    			// 给一个状态CONNECTING,连接中
    			System.out.println(zookeeper.getState());
    			
    			try {
    				// CountDownLatch
    				// java多线程并发同步的一个工具类
    				// 会传递进去一些数字,比如说1,2 ,3 都可以
    				// 然后await(),如果数字不是0,那么久卡住,等待
    				
    				// 其他的线程可以调用coutnDown(),减1
    				// 如果数字减到0,那么之前所有在await的线程,都会逃出阻塞的状态
    				// 继续向下运行
    				
    				connectedSemaphore.await();
    			} catch(InterruptedException e) {
    				e.printStackTrace();
    			}
    
    			System.out.println("ZooKeeper session established......");
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    	
    	/**
    	 * 获取分布式锁
    	 * @param productId
    	 */
    	public void acquireDistributedLock(Long productId) {
    		String path = "/product-lock-" + productId;
    	
    		try {
    			zookeeper.create(path, "".getBytes(), 
    					Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    			System.out.println("success to acquire lock for product[id=" + productId + "]");  
    		} catch (Exception e) {
    			// 如果那个商品对应的锁的node,已经存在了,就是已经被别人加锁了,那么就这里就会报错
    			// NodeExistsException
    			int count = 0;
    			while(true) {
    				try {
    					Thread.sleep(20); 
    					zookeeper.create(path, "".getBytes(), 
    							Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    				} catch (Exception e2) {
    					e2.printStackTrace();
    					count++;
    					continue;
    				}
    				System.out.println("success to acquire lock for product[id=" + productId + "] after " + count + " times try......");
    				break;
    			}
    		}
    	}
    	
    	/**
    	 * 释放掉一个分布式锁
    	 * @param productId
    	 */
    	public void releaseDistributedLock(Long productId) {
    		String path = "/product-lock-" + productId;
    		try {
    			zookeeper.delete(path, -1); 
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    	
    	/**
    	 * 建立zk session的watcher
    	 * @author Administrator
    	 *
    	 */
    	private class ZooKeeperWatcher implements Watcher {
    
    		public void process(WatchedEvent event) {
    			System.out.println("Receive watched event: " + event.getState());
    			if(KeeperState.SyncConnected == event.getState()) {
    				connectedSemaphore.countDown();
    			} 
    		}
    		
    	}
    	
    	/**
    	 * 封装单例的静态内部类
    	 * @author Administrator
    	 *
    	 */
    	private static class Singleton {
    		
    		private static ZooKeeperSession instance;
    		
    		static {
    			instance = new ZooKeeperSession();
    		}
    		
    		public static ZooKeeperSession getInstance() {
    			return instance;
    		}
    		
    	}
    	
    	/**
    	 * 获取单例
    	 * @return
    	 */
    	public static ZooKeeperSession getInstance() {
    		return Singleton.getInstance();
    	}
    	
    	/**
    	 * 初始化单例的便捷方法
    	 */
    	public static void init() {
    		getInstance();
    	}
    	
    }
    

      

     实现思路:

    1、主动更新

    监听kafka消息队列,获取到一个商品变更的消息之后,去哪个源服务中调用接口拉取数据,更新到ehcache和redis中

    先获取分布式锁,然后才能更新redis,同时更新时要比较时间版本

    2、被动重建

    直接读取源头数据,直接返回给nginx,同时推送一条消息到一个队列,后台线程异步消费

    后台现成负责先获取分布式锁,然后才能更新redis,同时要比较时间版本

    // 加代码,在将数据直接写入redis缓存之前,应该先获取一个zk的分布式锁

    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    		ZooKeeperSession zkSession = ZooKeeperSession.getInstance();
    		zkSession.acquireDistributedLock(productId);  
    		
    		// 获取到了锁
    		// 先从redis中获取数据
    		ProductInfo existedProductInfo = cacheService.getProductInfoFromReidsCache(productId);
    		
    		if(existedProductInfo != null) {
    			// 比较当前数据的时间版本比已有数据的时间版本是新还是旧
    			try {
    				Date date = sdf.parse(productInfo.getModifiedTime());
    				Date existedDate = sdf.parse(existedProductInfo.getModifiedTime());
    				
    				if(date.before(existedDate)) {
    					System.out.println("current date[" + productInfo.getModifiedTime() + "] is before existed date[" + existedProductInfo.getModifiedTime() + "]");
    					return;
    				}
    			} catch (Exception e) {
    				e.printStackTrace();
    			}
    			System.out.println("current date[" + productInfo.getModifiedTime() + "] is after existed date[" + existedProductInfo.getModifiedTime() + "]");
    		} else {
    			System.out.println("existed product info is null......");   
    		}
    		
    		cacheService.saveProductInfo2ReidsCache(productInfo);  
    		
    		// 释放分布式锁
    		zkSession.releaseDistributedLock(productId); 
    

      

    if(productInfo == null) {
    			// 就需要从数据源重新拉去数据,重建缓存,但是这里先不讲
    			String productInfoJSON = "{"id": 2, "name": "iphone7手机", "price": 5599, "pictureList":"a.jpg,b.jpg", "specification": "iphone7的规格", "service": "iphone7的售后服务", "color": "红色,白色,黑色", "size": "5.5", "shopId": 1, "modified_time": "2017-01-01 12:01:00"}";
    			productInfo = JSONObject.parseObject(productInfoJSON, ProductInfo.class);
    			// 将数据推送到一个内存队列中
    			RebuildCacheQueue rebuildCacheQueue = RebuildCacheQueue.getInstance();
    			rebuildCacheQueue.putProductInfo(productInfo);
    		}
    

      

    import java.util.concurrent.ArrayBlockingQueue;
    
    /**
     * 重建缓存的内存队列
     *
     */
    public class RebuildCacheQueue {
    
    	private ArrayBlockingQueue<ProductInfo> queue = new ArrayBlockingQueue<ProductInfo>(1000);
    	
    	public void putProductInfo(ProductInfo productInfo) {
    		try {
    			queue.put(productInfo); 
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    	
    	public ProductInfo takeProductInfo() {
    		try {
    			return queue.take();
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    		return null;
    	}
    	
    	/**
    	 * 内部单例类
    	 */
    	private static class Singleton {
    		
    		private static RebuildCacheQueue instance;
    		
    		static {
    			instance = new RebuildCacheQueue();
    		}
    		
    		public static RebuildCacheQueue getInstance() {
    			return instance;
    		}
    		
    	}
    	
    	public static RebuildCacheQueue getInstance() {
    		return Singleton.getInstance();
    	}
    	
    	public static void init() {
    		getInstance();
    	}
    	
    }
    

      

    import java.text.SimpleDateFormat;
    import java.util.Date;
    import com.roncoo.eshop.cache.model.ProductInfo;
    import com.roncoo.eshop.cache.service.CacheService;
    import com.roncoo.eshop.cache.spring.SpringContext;
    import com.roncoo.eshop.cache.zk.ZooKeeperSession;
    
    /**
     * 缓存重建线程
     */
    public class RebuildCacheThread implements Runnable {
    	
    	private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
    	public void run() {
    		RebuildCacheQueue rebuildCacheQueue = RebuildCacheQueue.getInstance();
    		ZooKeeperSession zkSession = ZooKeeperSession.getInstance();
    		CacheService cacheService = (CacheService) SpringContext.getApplicationContext()
    				.getBean("cacheService");
    		
    		while(true) {
    			ProductInfo productInfo = rebuildCacheQueue.takeProductInfo();
    			
    			zkSession.acquireDistributedLock(productInfo.getId());  
    			
    			ProductInfo existedProductInfo = cacheService.getProductInfoFromReidsCache(productInfo.getId());
    			
    			if(existedProductInfo != null) {
    				// 比较当前数据的时间版本比已有数据的时间版本是新还是旧
    				try {
    					Date date = sdf.parse(productInfo.getModifiedTime());
    					Date existedDate = sdf.parse(existedProductInfo.getModifiedTime());
    					
    					if(date.before(existedDate)) {
    						System.out.println("current date[" + productInfo.getModifiedTime() + "] is before existed date[" + existedProductInfo.getModifiedTime() + "]");
    						continue;
    					}
    				} catch (Exception e) {
    					e.printStackTrace();
    				}
    				System.out.println("current date[" + productInfo.getModifiedTime() + "] is after existed date[" + existedProductInfo.getModifiedTime() + "]");
    			} else {
    				System.out.println("existed product info is null......");   
    			}
    			
    			cacheService.saveProductInfo2ReidsCache(productInfo);  
    		}
    	}
    
    }
    

      启动线程:

    new Thread(new RebuildCacheThread()).start();
    

      

  • 相关阅读:
    POJ 1286 Necklace of Beads(Polya简单应用)
    《Nosql精粹》—— 读后总结
    基于ELK的数据分析实践——满满的干货送给你
    ELK5.0安装教程
    Oozie分布式工作流——EL表达式
    《分布式Java应用与实践》—— 后面两章
    Oozie分布式工作流——从理论和实践分析使用节点间的参数传递
    Oozie分布式工作流——Action节点
    Oozie分布式工作流——流控制
    图文并茂 —— 基于Oozie调度Sqoop
  • 原文地址:https://www.cnblogs.com/sunliyuan/p/11408274.html
Copyright © 2011-2022 走看看