zoukankan      html  css  js  c++  java
  • 分布式改造剧集3:Ehcache分布式改造

    第三集:分布式Ehcache缓存改造

    前言

    ​ 好久没有写博客了,大有半途而废的趋势。忙不是借口,这个好习惯还是要继续坚持。前面我承诺的第一期的DIY分布式,是时候上终篇了---DIY分布式缓存。


    探索之路

    ​ 在前面的文章中,我给大家大致说过项目背景:项目中的缓存使用的是Ehcache。因为前面使用Ehcache的应用就一台,所以这种单机的Ehcache并不会有什么问题。现在分布式部署之后,如果各个应用之间的缓存不能共享,那么其实各自就是一个孤岛。可能在一个业务跑下来,请求了不同的应用,结果在缓存中取出来的值不一样,

    造成数据不一致。所以需要重新设计缓存的实现。

    ​ 因为尽量不要引入新的中间件,所以改造仍然是围绕Ehcache来进行的。搜集了各种资料之后,发现Ehcache实现分布式缓存基本有以下两种思路:

    • 客户端实现分布式算法: 在使用Ehcache的客户端自己实现分布式算法。

      算法的基本思路就是取模:即假设有三台应用(编号假设分别为0,1,2),对于一个要缓存的对象,首先计算其key的hash值,然后将hash值模3,得到的余数是几,就将数据缓存到哪台机器。

    • 同步冗余数据: Ehcache是支持集群配置的,集群的各个节点之间支持按照一定的协议进行数据同步。这样每台应用其实缓存了一整份数据,不同节点之间的数据是一致的。

    ​ 虽然冗余的办法显得有点浪费资源,但是我最终还是选择了冗余。具体原因有以下几点:

    • 分布式算法的复杂性: 前面所讲的分布式算法只是最基本的实现。事实上实现要比这个复杂的多。需要考虑增加或者删除节点的情况,需要使用更加复杂的一致性hash算法
    • 可能导致整个应用不可用: 当删除节点之后,如果算法不能够感知进行自动调整,仍然去请求那个已经被删除的节点,可能导致整个系统不可用。

    Demo

    ​ 最终我的实现采用RMI的方式进行同步

    配置ehcache

    ​ spring-ehcache-cache.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <ehcache xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:noNamespaceSchemaLocation="http://ehcache.org/ehcache.xsd" name="businessCaches">
    
      <diskStore path="java.io.tmpdir/ehcache"/>
      <cache name="business1Cache"
             maxElementsInMemory="10000000"
             eternal="true"
             overflowToDisk="false"
             memoryStoreEvictionPolicy="LRU">
             <cacheEventListenerFactory
                class="net.sf.ehcache.distribution.RMICacheReplicatorFactory"/>
        </cache>
    
      <cache name="business2Cache"
             maxElementsInMemory="100"
             eternal="true"
             overflowToDisk="false"
             memoryStoreEvictionPolicy="LRU">
             <cacheEventListenerFactory
                class="net.sf.ehcache.distribution.RMICacheReplicatorFactory"/>
        </cache>
              
      
      <!-- cache发布信息配置,人工发现peerDiscovery=manual,cacheNames可配置多个缓存名称,以|分割 ) -->
       <cacheManagerPeerProviderFactory
            class="com.rampage.cache.distribute.factory.DisRMICacheManagerPeerProviderFactory"
            properties="peerDiscovery=manual, cacheNames=business1Cache|business2Cache" />
      
      
      <!-- 接收同步cache信息的地址 -->
     <cacheManagerPeerListenerFactory
            class="com.rampage.cache.distribute.factory.DisRMICacheManagerPeerListenerFactory"
            properties="socketTimeoutMillis=2000" />     
    </ehcache>
    

    ​ spring-cache.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:cache="http://www.springframework.org/schema/cache"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:aop="http://www.springframework.org/schema/aop"
        xsi:schemaLocation="http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
            http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd"
            default-autowire="byName">
        <!-- 包扫描 -->
        <context:component-scan base-package="com.rampage.cache" />
        <!-- 启用Cache注解 -->
        <cache:annotation-driven cache-manager="cacheManager"
            key-generator="keyGenerator" proxy-target-class="true" />
        <!-- 自定义的缓存key生成类,需实现org.springframework.cache.interceptor.KeyGenerator接口 -->
        <bean id="keyGenerator" class="com.rampage.cache.support.CustomKeyGenerator" />
        <!-- 替换slite的ehcache实现 -->
        <bean id="ehCacheManagerFactory" class="org.springframework.cache.ehcache.EhCacheManagerFactoryBean">
            <property name="configLocation" value="classpath:spring/cache/sppay-ehcache-cache.xml"/>
            <!-- value对应前面ehcache文件定义的manager名称 -->
            <property name="cacheManagerName" value="businessCaches" />
        </bean>
        <bean id="ehCacheManager" class="org.springframework.cache.ehcache.EhCacheCacheManager">
            <property name="cacheManager" ref="ehCacheManagerFactory"/>
        </bean>
        <bean id="cacheManager" class="org.springframework.cache.support.CompositeCacheManager">
            <property name="cacheManagers">
                <list>
                    <ref bean="ehCacheManager" />
                </list>
            </property>
            <property name="fallbackToNoOpCache" value="true" />
        </bean>    
    </beans>
    

    实现自定义转发和监听

    ​ 细心的读者应该不难发现,前面xml配置中cacheManagerPeerProviderFactorycacheManagerPeerListenerFactory我使用的都是自定义的类。之所以使用自定义的类,是为了在初始化的时候发布的地址和端口,监听的地址端口可以在配置文件配置。具体类的实现如下:

    /**
     * 分布式EhCache监听工厂
     * @author secondWorld
     *
     */
    public class DisRMICacheManagerPeerListenerFactory extends RMICacheManagerPeerListenerFactory {
    
    	private static final Logger LOGGER = LoggerFactory.getLogger(DisRMICacheManagerPeerListenerFactory.class);
    	
    	/**
    	 * 配置文件中配置的监听地址,可以不配置,默认为本机地址
    	 */
    	private static final String LISTEN_HOST = "distribute.ehcache.listenIP";
    	
    	/**
    	 * 配置文件中配置的监听端口
    	 */
    	private static final String LISTEN_PORT = "distribute.ehache.listenPort";
    	
    	@Override
    	protected CacheManagerPeerListener doCreateCachePeerListener(String hostName, Integer port,
    	        Integer remoteObjectPort, CacheManager cacheManager, Integer socketTimeoutMillis) {
    		// xml中hostName为空,则读取配置文件(app-config.properties)中的值
    		if (StringUtils.isEmpty(hostName)) {
    			String propHost = AppConfigPropertyUtils.get(LISTEN_HOST);
    			if (StringUtils.isNotEmpty(propHost)) {
    				hostName = propHost;
    			}
    		}
    		
    		// 端口采用默认端口0,则去读取配置文件(app-config.properties)中的值
    		if (port != null && port == 0) {
    			Integer propPort = null;
    			try {
    				propPort = Integer.parseInt(AppConfigPropertyUtils.get(LISTEN_PORT));
    			} catch (NumberFormatException e) {
    			}
    			if (propPort != null) {
    				port = propPort;
    			}
    		}
    		
    		LOGGER.info(
    		        "Initiliazing DisRMICacheManagerPeerListenerFactory:cacheManager[{}], hostName[{}], port[{}], remoteObjectPort[{}], socketTimeoutMillis[{}]......",
    		        cacheManager, hostName, port, remoteObjectPort, socketTimeoutMillis);
    		
    		return super.doCreateCachePeerListener(hostName, port, remoteObjectPort, cacheManager, socketTimeoutMillis);
    	}
    }
    
    
    
    /**
     * 分布式EhCache发布工厂
     * 
     * @author secondWorld
     *
     */
    public class DisRMICacheManagerPeerProviderFactory extends RMICacheManagerPeerProviderFactory {
    
    	private static final Logger LOGGER = LoggerFactory.getLogger(DisRMICacheManagerPeerProviderFactory.class);
    
    	private static final String CACHENAME_DELIMITER = "|";
    
    	private static final String PROVIDER_ADDRESSES = "distribute.ehcache.providerAddresses";
    
    	private static final String CACHE_NAMES = "cacheNames";
    
    	/**
    	 * rmi地址格式: //127.0.0.1:4447/Cache1|//127.0.0.1:4447/Cache2
    	 */
    	@Override
    	protected CacheManagerPeerProvider createManuallyConfiguredCachePeerProvider(Properties properties) {
    		// 从app-config.properties中读取发布地址列表
    		String providerAddresses = AppConfigPropertyUtils.get(PROVIDER_ADDRESSES, StringUtils.EMPTY);
    		// 从ehcache配置文件读取缓存名称
    		String cacheNames = PropertyUtil.extractAndLogProperty(CACHE_NAMES, properties);
    
    		// 参数校验,这里发布地址和缓存名称都不能为空
    		if (StringUtils.isEmpty(providerAddresses) || StringUtils.isEmpty(cacheNames)) {
    			throw new IllegalArgumentException("Elements "providerAddresses" and "cacheNames" are needed!");
    		}
    
    		// 解析地址列表
    		List<String> cachesNameList = getCacheNameList(cacheNames);
    		List<String> providerAddressList = getProviderAddressList(providerAddresses);
    		
    		// 注册发布节点
    		RMICacheManagerPeerProvider rmiPeerProvider = new ManualRMICacheManagerPeerProvider();
    		StringBuilder sb = new StringBuilder();
    		for (String cacheName : cachesNameList) {
    			for (String providerAddress : providerAddressList) {
    				sb.setLength(0);
    				sb.append("//").append(providerAddress).append("/").append(cacheName);
    				rmiPeerProvider.registerPeer(sb.toString());
    				LOGGER.info("Registering peer provider [{}]", sb);
    			}
    		}
    
    		return rmiPeerProvider;
    	}
    
    	/**
    	 * 得到发布地址列表
    	 * @param providerAddresses 发布地址字符串
    	 * @return 发布地址列表
    	 */
    	private List<String> getProviderAddressList(String providerAddresses) {
    		StringTokenizer stringTokenizer = new StringTokenizer(providerAddresses,
    		        AppConfigPropertyUtils.APP_ITEM_DELIMITER);
    
    		List<String> ProviderAddressList = new ArrayList<String>(stringTokenizer.countTokens());
    		while (stringTokenizer.hasMoreTokens()) {
    			String providerAddress = stringTokenizer.nextToken();
    			providerAddress = providerAddress.trim();
    			ProviderAddressList.add(providerAddress);
    		}
    
    		return ProviderAddressList;
    	}
    
    	/**
    	 * 得到缓存名称列表
    	 * @param cacheNames 缓存名称字符串
    	 * @return 缓存名称列表
    	 */
    	private List<String> getCacheNameList(String cacheNames) {
    		StringTokenizer stringTokenizer = new StringTokenizer(cacheNames, CACHENAME_DELIMITER);
    
    		List<String> cacheNameList = new ArrayList<String>(stringTokenizer.countTokens());
    		while (stringTokenizer.hasMoreTokens()) {
    			String cacheName = stringTokenizer.nextToken();
    			cacheName = cacheName.trim();
    			cacheNameList.add(cacheName);
    		}
    
    		return cacheNameList;
    	}
    
    	@Override
    	protected CacheManagerPeerProvider createAutomaticallyConfiguredCachePeerProvider(CacheManager cacheManager,
    	        Properties properties) throws IOException {
    		throw new UnsupportedOperationException("Not supported automatic distribute cache!");
    	}
    }
    
    

    配置

    ​ 假设有三台机器,则他们分别得配置如下:

    #应用1,在4447端口监听
    #缓存同步消息发送地址(如果同步到多台需要配置多台地址,多台地址用英文逗号分隔)
    distribute.ehcache.providerAddresses=127.0.0.1:4446,127.0.0.1:4448
    #缓存同步监听端口和IP
    distribute.ehache.listenPort=4447
    distribute.ehcache.listenIP=localhost
    
    #应用2,在4448端口监听
    #缓存同步消息发送地址(如果同步到多台需要配置多台地址,多台地址用英文逗号分隔)
    distribute.ehcache.providerAddresses=127.0.0.1:4446,127.0.0.1:4447
    #缓存同步监听端口和IP
    distribute.ehache.listenPort=4448
    distribute.ehcache.listenIP=localhost
    
    #应用3,在4446端口监听
    #缓存同步消息发送地址(如果同步到多台需要配置多台地址,多台地址用英文逗号分隔)
    distribute.ehcache.providerAddresses=127.0.0.1:4447,127.0.0.1:4448
    #缓存同步监听端口和IP
    distribute.ehache.listenPort=4446
    distribute.ehcache.listenIP=localhost
    

    使用

    ​ 使用的时候直接通过Spring的缓存注解即可。简单的示例如下:

    @CacheConfig("business1Cache")
    @Component
    public class Business1 {
        @Cacheable
        public String getData(String key) {
            // TODO:...
        }
    }
    

    说明

    ​ 前面的实现是通过RMI的方式来实现缓存同步的,相对来说RMI的效率还是很快的。所以如果不需要实时的缓存一致性,允许少许延迟,那么这种方式的实现足够。


    总结

    ​ 到这篇完成,分布式改造的第一章算是告一段落了。对于分布式,如果可以选择,必然要选择现在成熟的框架。但是项目有很多时候,由于各种历史原因,必须要在原来的基础上改造。这个时候,希望我写的这个系列对大家有所帮助。造轮子有时候就是这么简单。


    相关链接

  • 相关阅读:
    第一次作业
    C语言I博客作业02
    C语言|博客作业11
    C语言I博客作业10
    C语言I博客作业09
    C语言I博客作业08
    C语言I博客作业07
    C语言I博客作业06
    C语言I博客作业05
    JDK-14 & Eclipse & Hello World!
  • 原文地址:https://www.cnblogs.com/Kidezyq/p/9243878.html
Copyright © 2011-2022 走看看