zoukankan      html  css  js  c++  java
  • XMemcached使用经历

      XMemcached就是Memcached的java客户端之一,目前项目里用到了。据说它比起其他的java客户端从性能上要好一点,实现方式是NIO的。先看怎么实例化出来一个Memcached客户端吧:

        public static MilletMemcacheClient getIstance(String configPath) throws IOException {
            try {
                if (instance == null) {
                    synchronized (MilletMemcacheClient.class) {
                        if (instance == null) {
                            if (null == memEntity) {
                                logger.info("Load path:[ {} ] on etcd!",configPath);
                                ClientConfig config = ServiceDiscovery.getSingleton().read(configPath);                            
                                memEntity = new MilletMemcacheClientBuilder(AddrUtil.getAddressMap(StringUtils.trim(config.getIps())));
                                memEntity.setConnectionPoolSize(config.getLinksNum());
                                memEntity.setSessionLocator(new KetamaMemcachedSessionLocator());
                            }
                            MemcachedClient client = memEntity.build();
                            if (null != client) {
                                client.setMergeFactor(MERGE_FACTOR);
                                client.setConnectTimeout(CONNECT_TIMEOUT);
                            }
                            instance = new MilletMemcacheClient(client);
                        }
                    }
                }
            } catch (Exception e) {
                logger.error("Init memcache client failed!", e);
                throw e;
            }
            return instance;
        }

      这里的memEntity继承了net.rubyeye.xmemcached.XMemcachedClientBuilder,而它的build方式具体实现是这样的:

        /*
         * (non-Javadoc)
         * 
         * @see net.rubyeye.xmemcached.MemcachedClientBuilder#build()
         */
        public MemcachedClient build() throws IOException {
            XMemcachedClient memcachedClient;
            // kestrel protocol use random session locator.
            if (this.commandFactory.getProtocol() == Protocol.Kestrel) {
                if (!(this.sessionLocator instanceof RandomMemcachedSessionLocaltor)) {
                    log.warn("Recommend to use `net.rubyeye.xmemcached.impl.RandomMemcachedSessionLocaltor` as session locator for kestrel protocol.");
                }
            }
            if (this.weights == null) {
                memcachedClient = new XMemcachedClient(this.sessionLocator,
                        this.bufferAllocator, this.configuration,
                        this.socketOptions, this.commandFactory, this.transcoder,
                        this.addressMap, this.stateListeners, this.authInfoMap,
                        this.connectionPoolSize, this.connectTimeout, this.name,
                        this.failureMode);
    
            } else {
                if (this.addressMap == null) {
                    throw new IllegalArgumentException("Null Address map");
                }
                if (this.addressMap.size() > this.weights.length) {
                    throw new IllegalArgumentException(
                            "Weights Array's length is less than server's number");
                }
                memcachedClient = new XMemcachedClient(this.sessionLocator,
                        this.bufferAllocator, this.configuration,
                        this.socketOptions, this.commandFactory, this.transcoder,
                        this.addressMap, this.weights, this.stateListeners,
                        this.authInfoMap, this.connectionPoolSize,
                        this.connectTimeout, this.name, this.failureMode);
            }
            this.configureClient(memcachedClient);
            return memcachedClient;
        }

      我们可以看到这里需要配置一些参数,memcached的客户端就是根据这些参数来进行实例化的。当然大部分参数实际上都有默认值。这里最主要的是拿到memcached的地址和连接数。

      接着看具体操作,在这之前先看个序列化接口。我们存入和取出memcached的对象都是二进制的,那么必然要进行序列化,XMemcached给了我们接口

    /** <a href="http://www.cpupk.com/decompiler">Eclipse Class Decompiler</a> plugin, Copyright (c) 2017 Chen Chao. */
    // Copyright (c) 2006  Dustin Sallings <dustin@spy.net>
    package net.rubyeye.xmemcached.transcoders;
    
    /**
     * Transcoder is an interface for classes that convert between byte arrays and
     * objects for storage in the cache.
     */
    public interface Transcoder<T> {
    
        /**
         * Encode the given object for storage.
         * 
         * @param o
         *            the object
         * @return the CachedData representing what should be sent
         */
        CachedData encode(T o);
    
        /**
         * Decode the cached object into the object it represents.
         * 
         * @param d
         *            the data
         * @return the return value
         */
        T decode(CachedData d);
    
        /**
         * Set whether store primitive type as string.
         * 
         * @param primitiveAsString
         */
        public void setPrimitiveAsString(boolean primitiveAsString);
    
        /**
         * Set whether pack zeros
         * 
         * @param primitiveAsString
         */
        public void setPackZeros(boolean packZeros);
        /**
         * Set compression threshold in bytes
         * @param to
         */
        public void setCompressionThreshold(int to);
    
        /**
         * Returns if client stores primitive type as string.
         * @return
         */
        public boolean isPrimitiveAsString();
    
        /**
         * Returns if transcoder packs zero.
         * @return
         */
        public boolean isPackZeros();
        
        /**
         * Set compress mode,default is ZIP
         * @see CompressionMode
         * @param compressMode
         */
        public void setCompressionMode(CompressionMode compressMode);
    }

      既然给了接口,就是允许我们自己扩展的,当然如果不扩展,那么也可以用XMemcached自己的默认实现。下面看下存和取怎么实现

        public <T> boolean add(String key, int exp, T value, Transcoder<T> transcoder, long timeout)
            throws MemcachedException
        {
            try
            {
                return mc.add(key, exp, value, transcoder, timeout);
            }
            catch (TimeoutException e)
            {
                onTimeoutException(OPERATE_ADD, key, e);
            }
            catch (InterruptedException e)
            {
                onInterruptedException(OPERATE_ADD, key, e);
            }
    
            return false;
        }

      这里的mc是一个接口,统一封装的XMemcached的操作,实际调用的是MemcachedClient的add方法

        /*
         * (non-Javadoc)
         * 
         * @see net.rubyeye.xmemcached.MemcachedClient#add(java.lang.String, int, T,
         * net.rubyeye.xmemcached.transcoders.Transcoder, long)
         */
        public final <T> boolean add(String key, final int exp, final T value,
                final Transcoder<T> transcoder, final long timeout)
                throws TimeoutException, InterruptedException, MemcachedException {
            key = this.preProcessKey(key);
            return this.add0(key, exp, value, transcoder, timeout);
        }
        private <T> boolean add0(String key, int exp, T value,
                Transcoder<T> transcoder, long timeout)
                throws InterruptedException, TimeoutException, MemcachedException {
            byte[] keyBytes = this.checkStoreArguments(key, exp, value);
            return this.sendStoreCommand(this.commandFactory.createAddCommand(key,
                    keyBytes, exp, value, false, transcoder), timeout);
        }
        private final <T> boolean sendStoreCommand(Command command, long timeout)
                throws InterruptedException, TimeoutException, MemcachedException {
    
            final Session session = this.sendCommand(command);
            if (!command.isNoreply()) {
                this.latchWait(command, timeout, session);
                command.getIoBuffer().free();
                this.checkException(command);
                if (command.getResult() == null) {
                    throw new MemcachedException(
                            "Operation fail,may be caused by networking or timeout");
                }
            } else {
                return false;
            }
            return (Boolean) command.getResult();
        }
        private final <T> byte[] checkStoreArguments(final String key,
                final int exp, final T value) {
            byte[] keyBytes = ByteUtils.getBytes(key);
            ByteUtils.checkKey(keyBytes);
            if (value == null) {
                throw new IllegalArgumentException("value could not be null");
            }
            if (exp < 0) {
                throw new IllegalArgumentException(
                        "Expire time must be greater than or equal to 0");
            }
            return keyBytes;
        }

      我们看到序列化方式作为参数transcoder被带进去了,针对key有checkKey校验,针对并发有latchWait限制。存的话返回一个布尔值,取直接返回对象

        public <T> T get(String key, long timeout, Transcoder<T> transcoder) throws MemcachedException
        {
            try
            {
                try
                {
                    readController.beginTransaction();
                }
                catch (InterruptedException e)
                {
                    onInterruptedException(OPERATE_GET, key, e);
                }
    
                try
                {
                    return mc.get(key, timeout, transcoder);
                }
                catch (TimeoutException e)
                {
                    onTimeoutException(OPERATE_GET, key, e);
                }
                catch (InterruptedException e)
                {
                    onInterruptedException(OPERATE_GET, key, e);
                }
            }
            finally
            {
                readController.finishTransaction();
            }
            return null;
        }
        /*
         * (non-Javadoc)
         * 
         * @see net.rubyeye.xmemcached.MemcachedClient#get(java.lang.String, long,
         * net.rubyeye.xmemcached.transcoders.Transcoder)
         */
        @SuppressWarnings("unchecked")
        public final <T> T get(final String key, final long timeout,
                final Transcoder<T> transcoder) throws TimeoutException,
                InterruptedException, MemcachedException {
            return (T) this.get0(key, timeout, CommandType.GET_ONE, transcoder);
        }
        private <T> Object get0(String key, final long timeout,
                final CommandType cmdType, final Transcoder<T> transcoder)
                throws TimeoutException, InterruptedException, MemcachedException {
            key = this.preProcessKey(key);
            byte[] keyBytes = ByteUtils.getBytes(key);
            ByteUtils.checkKey(keyBytes);
            return this.fetch0(key, keyBytes, cmdType, timeout, transcoder);
        }
        @SuppressWarnings("unchecked")
        private final <T> Object fetch0(final String key, final byte[] keyBytes,
                final CommandType cmdType, final long timeout,
                Transcoder<T> transcoder) throws InterruptedException,
                TimeoutException, MemcachedException, MemcachedException {
            final Command command = this.commandFactory.createGetCommand(key,
                    keyBytes, cmdType, this.transcoder);
            this.latchWait(command, timeout, this.sendCommand(command));
            command.getIoBuffer().free(); // free buffer
            this.checkException(command);
            CachedData data = (CachedData) command.getResult();
            if (data == null) {
                return null;
            }
            if (transcoder == null) {
                transcoder = this.transcoder;
            }
            if (cmdType == CommandType.GETS_ONE) {
                return new GetsResponse<T>(data.getCas(), transcoder.decode(data));
            } else {
                return transcoder.decode(data);
            }
        }

      我们看到执行memcached命令取到数据后执行了反序列化,把二进制变成了对象。而且这里在读取之前还加了事务控制,XMemcached是没有事务支持的,所以必须自己实现。同存操作,我们看到取操作也有针对key有checkKey校验,针对并发有latchWait限制。最后看下CAS的实现:

        public boolean cas(String key, int exp, Object value, long cas) throws MemcachedException
        {
            try
            {
                return mc.cas(key, exp, value, cas);
            }
            catch (TimeoutException e)
            {
                onTimeoutException(OPERATE_CAS, key, e);
            }
            catch (InterruptedException e)
            {
                onInterruptedException(OPERATE_CAS, key, e);
            }
    
            return false;
        }
        /*
         * (non-Javadoc)
         * 
         * @see net.rubyeye.xmemcached.MemcachedClient#cas(java.lang.String, int,
         * java.lang.Object, long)
         */
        public final boolean cas(final String key, final int exp,
                final Object value, final long cas) throws TimeoutException,
                InterruptedException, MemcachedException {
            return this.cas(key, exp, value, this.opTimeout, cas);
        }
        /*
         * (non-Javadoc)
         * 
         * @see net.rubyeye.xmemcached.MemcachedClient#cas(java.lang.String, int,
         * java.lang.Object, long, long)
         */
        @SuppressWarnings("unchecked")
        public final boolean cas(final String key, final int exp,
                final Object value, final long timeout, final long cas)
                throws TimeoutException, InterruptedException, MemcachedException {
            return this.cas(key, exp, value, this.transcoder, timeout, cas);
        }
        /*
         * (non-Javadoc)
         * 
         * @see net.rubyeye.xmemcached.MemcachedClient#cas(java.lang.String, int, T,
         * net.rubyeye.xmemcached.transcoders.Transcoder, long, long)
         */
        public final <T> boolean cas(String key, final int exp, final T value,
                final Transcoder<T> transcoder, final long timeout, final long cas)
                throws TimeoutException, InterruptedException, MemcachedException {
            key = this.preProcessKey(key);
            byte[] keyBytes = this.checkStoreArguments(key, 0, value);
            return this.sendStoreCommand(this.commandFactory.createCASCommand(key,
                    keyBytes, exp, value, cas, false, transcoder), timeout);
        }
        private final <T> boolean sendStoreCommand(Command command, long timeout)
                throws InterruptedException, TimeoutException, MemcachedException {
    
            final Session session = this.sendCommand(command);
            if (!command.isNoreply()) {
                this.latchWait(command, timeout, session);
                command.getIoBuffer().free();
                this.checkException(command);
                if (command.getResult() == null) {
                    throw new MemcachedException(
                            "Operation fail,may be caused by networking or timeout");
                }
            } else {
                return false;
            }
            return (Boolean) command.getResult();
        }

      客户端的CAS并无特别之处,就跟执行ADD命令一样返回执行是否成功,只是多传了一个cas参数用于版本号的控制。跟存取操作一样,针对key有checkKey校验,针对并发有latchWait限制。

        public static final void checkKey(final byte[] keyBytes) {
    
            if (keyBytes.length > ByteUtils.maxKeyLength) {
                throw new IllegalArgumentException("Key is too long (maxlen = "
                        + ByteUtils.maxKeyLength + ")");
            }
            // Validate the key
            if (memcachedProtocol == Protocol.Text || testing) {
                for (byte b : keyBytes) {
                    if (b == ' ' || b == '
    ' || b == '
    ' || b == 0) {
                        try {
                            throw new IllegalArgumentException(
                                    "Key contains invalid characters:  "
                                            + new String(keyBytes, "utf-8"));
    
                        } catch (UnsupportedEncodingException e) {
                        }
                    }
    
                }
            }
        }
    private static int maxKeyLength = 250;

      缓存KEY的字节数控制不能超过250。并发用闭锁控制,闭锁的创建和释放分别在Command对象的创建和Session对象的销毁,而锁住的动作则在执行命令之前:

      TextCommandFactory类:创建命令和闭锁,允许通过闭锁的线程数为1

        /*
         * (non-Javadoc)
         * 
         * @see
         * net.rubyeye.xmemcached.CommandFactory#createGetCommand(java.lang.String,
         * byte[], net.rubyeye.xmemcached.command.CommandType)
         */
        @SuppressWarnings("unchecked")
        public final Command createGetCommand(final String key,
                final byte[] keyBytes, final CommandType cmdType,
                Transcoder transcoder) {
            return new TextGetOneCommand(key, keyBytes, cmdType,
                    new CountDownLatch(1));
        }

       XMemcachedClient类:锁住,除非缓存命令操作结束,否则不允许其他线程进入;超时则取消命令

        protected void latchWait(final Command cmd, final long timeout,
                final Session session) throws InterruptedException,
                TimeoutException {
            if (cmd.getLatch().await(timeout, TimeUnit.MILLISECONDS)) {
                AtomicInteger counter = this.getContinuousTimeoutCounter(session);
                // reset counter.
                if (counter.get() > 0) {
                    counter.set(0);
                }
            } else {
                cmd.cancel();
                AtomicInteger counter = this.getContinuousTimeoutCounter(session);
                if (counter.incrementAndGet() > this.timeoutExceptionThreshold) {
                    log.warn(session
                            + " exceeded continuous timeout threshold,we will close it.");
                    try {
                        // reset counter.
                        counter.set(0);
                        session.close();
                    } catch (Exception e) {
                        // ignore it.
                    }
                }
                throw new TimeoutException(
                        "Timed out("
                                + timeout
                                + " milliseconds) waiting for operation while connected to "
                                + session);
            }
        }
    
        private AtomicInteger getContinuousTimeoutCounter(final Session session) {
            AtomicInteger counter = (AtomicInteger) session
                    .getAttribute(CONTINUOUS_TIMEOUT_COUNTER);
            if (counter == null) {
                counter = new AtomicInteger(0);
                AtomicInteger oldCounter = (AtomicInteger) session
                        .setAttributeIfAbsent(CONTINUOUS_TIMEOUT_COUNTER, counter);
                if (oldCounter != null) {
                    counter = oldCounter;
                }
            }
            return counter;
        }

       MemcachedHandler类:命令完成,销毁session

        public void destroy() {
            Command command = this.currentCommand.get();    
            if (command != null) {
                command.setException(new MemcachedException(
                        "Session has been closed"));
                CountDownLatch latch = command.getLatch();
                if (latch != null) {
                    latch.countDown();
                }
            }
            while ((command = this.commandAlreadySent.poll()) != null) {
                command.setException(new MemcachedException(
                        "Session has been closed"));
                CountDownLatch latch = command.getLatch();
                if (latch != null) {
                    latch.countDown();
                }
            }
    
        }

       最后再看一个实例化memcache缓存客户端、操作缓存的例子:

    package com.wulinfeng.memcache.view.service;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import javax.xml.parsers.DocumentBuilder;
    import javax.xml.parsers.DocumentBuilderFactory;
    
    import org.apache.commons.lang3.StringUtils;
    import org.apache.logging.log4j.LogManager;
    import org.apache.logging.log4j.Logger;
    import org.springframework.stereotype.Service;
    import org.springframework.util.ResourceUtils;
    import org.w3c.dom.Document;
    import org.w3c.dom.Element;
    import org.w3c.dom.Node;
    import org.w3c.dom.NodeList;
    
    import com.fasterxml.jackson.core.JsonProcessingException;
    import com.wulinfeng.memcache.view.bean.MemcacheViewBean;
    import com.wulinfeng.memcache.view.util.HttpClientUtil;
    import com.wulinfeng.memcache.view.util.PropertiesConfigUtil;
    
    /**
     * memcache展示业务逻辑类
     *
     * @author wulinfeng
     * @version C10 2017年11月21日
     * @since SDP V300R003C10
     */
    @Service
    public class MemcacheViewService
    {
        private static Logger LOGGER = LogManager.getLogger(MemcacheViewService.class);
        
        private static List<MemcacheViewBean> memcacheCategoryList = new ArrayList<>();
        
        private static Map<String, String> cacheDataMap = new HashMap<>();
        
        private static final String CONFIG_FILE = "/memcache_config.xml";
        
        private static final String TEST_URL = PropertiesConfigUtil.getProperty("test_url");
        
        /**
         * 启动加载缓存信息
         *
         * @author wulinfeng
         */
        public static void init()
        {
            // 1、加载缓存分类列表
            initializeCacheList();
            
            // 2、加载缓存客户端实例
            getMemcacheClient();
        }
        
        /**
         * 获取缓存分类列表
         *
         * @author wulinfeng
         * @return
         */
        public MemcacheViewBean[] getMemcacheCategory()
        {
            return (MemcacheViewBean[])(memcacheCategoryList.toArray(new MemcacheViewBean[memcacheCategoryList.size()]));
        }
        
        /**
         * 针对缓存的查、删、改
         *
         * @author wulinfeng
         * @param operationType
         * @param key2
         * @return
         * @throws JsonProcessingException
         */
        public String doMemcached(String operationType, String cacheName, String key)
        {
            String result = null;
            XMemcachedClient client = XMemcachedClient.getMemcachedClient();
            switch (operationType)
            {
                case "1":// 根据key查缓存
                    Object obj = client.getObject(key);
                    result = (String)obj;
                    break;
                case "2":// 根据key删缓存
                    if (client.deleteObject(key))
                    {
                        result = "{"msg":"delete sucess"}";
                    }
                    break;
                case "3":// 根据缓存名找到查数据接口名,调用接口获取数据
                    String dataLoaderName = cacheDataMap.get(cacheName);
                    if (StringUtils.isNotEmpty(dataLoaderName))
                    {
                        result = HttpClientUtil.sendRequestByGetAsync(TEST_URL + dataLoaderName);
                    }
                    if (result != null)
                    {
                        client.addObject(key, result, 0);
                    }
                    break;
                default:
                    ;
            }
            
            return result;
        }
        
        /**
         * 加载缓存客户端
         *
         * @author wulinfeng
         * @return
         */
        private static void getMemcacheClient()
        {
            String mcListStr = PropertiesConfigUtil.getProperty("memcache.server", "");
            
            if (StringUtils.isEmpty(mcListStr))
            {
                LOGGER.error("Get MemcacheClient Error ! mcListStr is null !");
                return;
            }
            String[] mcList = mcListStr.split(",");
            try
            {
                XMemcachedClient.getInstance(PropertiesConfigUtil.getInt("memcache.linksNum", 1), mcList);
            }
            catch (IOException e)
            {
                LOGGER.error("Get MemcacheClient Error ! e:" + e);
                e.printStackTrace();
            }
        }
        
        /**
         * 加载缓存分类列表
         *
         * @author wulinfeng
         */
        private static void initializeCacheList()
        {
            memcacheCategoryList.clear();
            cacheDataMap.clear();
            try
            {
                DocumentBuilderFactory builderFactory = DocumentBuilderFactory.newInstance();
                DocumentBuilder builder;
                builder = builderFactory.newDocumentBuilder();
                String basePath = ResourceUtils.getFile("classpath:").getPath();
                String configPath = basePath + CONFIG_FILE;
                Document doc = builder.parse(configPath);
                NodeList itemNodes = doc.getElementsByTagName("item");
                
                for (int itemIndex = 0; itemIndex < itemNodes.getLength(); itemIndex++)
                {
                    Element itemElem = (Element)itemNodes.item(itemIndex);
                    NodeList childList = itemElem.getChildNodes();
                    String cacheName = "";
                    
                    MemcacheViewBean memcacheViewBean = new MemcacheViewBean();
                    for (int i = 0; i < childList.getLength(); i++)
                    {
                        Node childNode = childList.item(i);
                        
                        if (childNode.getNodeType() == Node.ELEMENT_NODE)
                        {
                            Element element = (Element)childNode;
                            String tagName = element.getTagName();
                            String textContent = element.getTextContent().trim();
                            
                            switch (tagName)
                            {
                                case "cacheName":
                                    cacheName = textContent;
                                    memcacheViewBean.setCacheName(textContent);
                                    break;
                                case "keyProfix":
                                    memcacheViewBean.setKeyProfix(textContent);
                                    break;
                                case "supportRefresh":
                                    memcacheViewBean.setSupportRefresh(textContent.equals("1"));
                                    break;
                                case "dataLoaderName":
                                    cacheDataMap.put(cacheName, textContent);
                                    break;
                                default:
                                
                            }
                        }
                    }
                    
                    memcacheCategoryList.add(memcacheViewBean);
                }
            }
            catch (Throwable e)
            {
                LOGGER.error("MemcacheViewService.init failed, e: ", e);
            }
        }
    }
    package com.wulinfeng.memcache.view.service;
    
    import java.io.IOException;
    
    import org.apache.commons.lang3.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import net.rubyeye.xmemcached.GetsResponse;
    import net.rubyeye.xmemcached.MemcachedClient;
    import net.rubyeye.xmemcached.XMemcachedClientBuilder;
    import net.rubyeye.xmemcached.impl.KetamaMemcachedSessionLocator;
    import net.rubyeye.xmemcached.utils.AddrUtil;
    
    /**
     * memcache缓存客户端
     *
     * @author wulinfeng
     * @version C10 2017年11月22日
     * @since SDP V300R003C10
     */
    public class XMemcachedClient
    {
        
        private final static Logger LOGGER = LoggerFactory.getLogger(XMemcachedClient.class);
        
        /**
         * 批量请求,每批次请求数量
         */
        private final static int MERGE_FACTOR = 20;
        
        /**
         * 链接超时时间
         */
        private final static int CONNECT_TIMEOUT = 500;
        
        /**
         * 超时时间的配置 单位:毫秒
         */
        protected static final long REQUEST_TIME_OUT = 1000;
        
        /**
         * 步长
         */
        private final long STEP = 1L;
        
        /**
         * 默认值
         */
        private final long DEFAULTVALUE = 0;
        
        private static XMemcachedClientBuilder memEntity;
        
        private static volatile XMemcachedClient instance;
        
        private MemcachedClient mc;
        
        private XMemcachedClient(MemcachedClient memcachedClient)
            throws IOException
        {
            this.mc = memcachedClient;
        }
        
        /**
         * 实例化客户端,单例模式
         *
         * @author wulinfeng
         * @param linksNum
         * @param memcacheIp
         * @return
         * @throws IOException
         */
        public static XMemcachedClient getInstance(int linksNum, String... memcacheIp)
            throws IOException
        {
            try
            {
                if (instance == null)
                {
                    synchronized (XMemcachedClient.class)
                    {
                        if (instance == null)
                        {
                            if (0 != memcacheIp.length)
                            {
                                memEntity = new XMemcachedClientBuilder(
                                    AddrUtil.getAddressMap(StringUtils.trim(array2string(memcacheIp))));
                                if (0 != linksNum)
                                {
                                    memEntity.setConnectionPoolSize(linksNum);
                                }
                            }
                            
                            // 节点挂掉返回失败,不会自动路由到其他节点
                            memEntity.setFailureMode(true);
                            
                            // 选择一致性哈希算法
                            memEntity.setSessionLocator(new KetamaMemcachedSessionLocator());
                            MemcachedClient client = memEntity.build();
                            if (null != client)
                            {
                                client.setMergeFactor(MERGE_FACTOR);
                                client.setConnectTimeout(CONNECT_TIMEOUT);
                            }
                            client.setEnableHeartBeat(true);
                            client.setEnableHealSession(true);
                            
                            // 网络关闭,失败,重试间隔时间间隔,10毫秒
                            client.setHealSessionInterval(10);
                            instance = new XMemcachedClient(client);
                        }
                    }
                }
            }
            catch (Exception e)
            {
                LOGGER.error("Init memcache client failed!", e);
                throw e;
            }
            return instance;
        }
        
        /**
         * 获取缓存客户端实例给外部
         *
         * @author wulinfeng
         * @return
         */
        public static XMemcachedClient getMemcachedClient()
        {
            return instance;
        }
        
        /**
         * 根据key值从IMemcachedClient获取缓存对象
         *
         * @param key 缓存的Key
         * @return 缓存对象
         */
        public Object getObject(String key)
        {
            try
            {
                Object result = mc.get(key, REQUEST_TIME_OUT);
                return result;
            }
            catch (Exception e)
            {
                LOGGER.error("get memcached faield : ", key, e);
            }
            return null;
        }
        
        /**
         * 根据key值从IMemcachedClient获取缓存对象
         *
         * @param key 缓存的Key
         * @param obj Object
         * @param time int
         * @return 缓存对象
         */
        public boolean addObject(String key, Object obj, int time)
        {
            // 是否添加成功
            boolean rst = false;
            int expire = 0;
            if (0 != time)
            {
                expire = (int)(System.currentTimeMillis() / 1000) + time;
            }
            try
            {
                rst = mc.add(key, expire, obj, REQUEST_TIME_OUT);
            }
            catch (Exception e)
            {
                LOGGER.error("add memcached faield : ", key, e);
            }
            return rst;
        }
        
        /**
         * 根据key值从IMemcachedClient获取缓存对象
         *
         * @param key 缓存的Key
         * @return 缓存对象
         */
        public Object getsObject(String key)
        {
            try
            {
                GetsResponse result = mc.gets(key, REQUEST_TIME_OUT);
                return result;
            }
            catch (Exception e)
            {
                LOGGER.error("gets memcached faield : ", key, e);
            }
            return null;
        }
        
        /**
         * 根据key值从IMemcachedClient删除缓存对象
         *
         * @param key 缓存的Key
         * @return 是否成功删除
         */
        public boolean deleteObject(String key)
        {
            try
            {
                boolean result = mc.delete(key, REQUEST_TIME_OUT);
                return result;
            }
            catch (Exception e)
            {
                LOGGER.error("delete memcached faield : ", key, e);
            }
            return false;
        }
        
        /**
         * 根据key值和更新对象,在Memcached中更新缓存对象
         *
         * @param key 缓存的Key
         * @param data 缓存对象
         * @param time int
         * @return 是否成功更新
         */
        public boolean updateObject(String key, Object data, int time)
        {
            boolean rst = false;
            try
            {
                GetsResponse getsResponse = mc.gets(key);
                if (null != getsResponse)
                {
                    int expire = 0;
                    if (0 != time)
                    {
                        expire = (int)(System.currentTimeMillis() / 1000) + time;
                    }
                    rst = mc.cas(key, expire, data, REQUEST_TIME_OUT, getsResponse.getCas());
                }
            }
            catch (Exception e)
            {
                LOGGER.error("update memcached faield : ", key, e);
            }
            return rst;
        }
        
        /**
         * 根据key值和更新对象,在Memcached中更新缓存对象 与updateObject不同之处在于不需要乐观锁判断,避免并发更新同一个主键对象失败
         *
         * @param key 缓存的Key
         * @param data 缓存对象
         * @param time int
         * @return 是否成功更新
         */
        public boolean updateObject2(String key, Object data, int time)
        {
            int expire = 0;
            boolean result = false;
            if (0 != time)
            {
                expire = (int)(System.currentTimeMillis() / 1000) + time;
            }
            try
            {
                result = mc.set(key, expire, data, REQUEST_TIME_OUT);
            }
            catch (Exception e)
            {
                LOGGER.error("update memcached faield : ", key, e);
            }
            return result;
        }
        
        /**
         * 缓存累加器
         *
         * @author wulinfeng
         * @param key 缓存key
         * @param initValue 初始化的值
         * @param invalidDurance 失效时间
         * @return
         */
        public long incr(String key, long initValue, int invalidDurance)
        {
            int expire = 0;
            if (0 != invalidDurance)
            {
                expire = (int)(System.currentTimeMillis() / 1000) + invalidDurance;
            }
            try
            {
                long result = mc.incr(key, STEP, initValue, REQUEST_TIME_OUT, expire);
                return result;
            }
            catch (Exception e)
            {
                LOGGER.error("incr memcached faield : ", key, e);
            }
            return 0;
        }
        
        /**
         * 递减计数器
         *
         * @author wulinfeng
         * @param key 缓存key
         * @param invalidDurance 失效时间
         * @return
         */
        public long decr(String key, int invalidDurance)
        {
            int expire = 0;
            if (0 != invalidDurance)
            {
                expire = (int)(System.currentTimeMillis() / 1000) + invalidDurance;
            }
            try
            {
                long result = mc.decr(key, STEP, DEFAULTVALUE, REQUEST_TIME_OUT, expire);
                return result;
            }
            catch (Exception e)
            {
                LOGGER.error("incr memcached faield : ", key, e);
            }
            return 0;
        }
        
        /**
         * @param szList
         * @return
         */
        private static String array2string(String[] szList)
        {
            StringBuilder sb = new StringBuilder();
            for (String szTmp : szList)
            {
                sb.append(szTmp).append(",");
            }
            return sb.substring(0, sb.lastIndexOf(","));
        }
    }

      

      

  • 相关阅读:
    PAT-A 1004. Counting Leaves
    PAT-A 1094. The Largest Generation
    图的邻接表表示及其BFS遍历
    图的邻接表表示及其遍历
    PAT-A 1024. Palindromic Number
    PAT-A 1019. General Palindromic Number
    PAT-B 1007.素数对猜想
    PAT-A 1048. Find Coins
    PAT-A 1037. Magic Coupon
    PAT-A 1099.Build A Binary Search Tree
  • 原文地址:https://www.cnblogs.com/wuxun1997/p/7751704.html
Copyright © 2011-2022 走看看