zoukankan      html  css  js  c++  java
  • RedisTemplate实现消息队列并且批量插入数据。

    早期由于生产环境业务量小。所以日志是一条一条commit的。运行也没出过问题。

    后来随着业务扩大并发量上来后,日志写入因为频繁与数据库打交道导致数据库连接池经常占满,直至程序崩溃。

    因为日志并非需要实时响应。所以考虑改用异步+批量提交的方式。

    为了缓解jvm内存压力。采用redis做消息队列(因为原项目有集成过redis,公司不想使用其他mq增加维护成本)。

    所以在网上找了篇springboot整合redistemplate做消息队列的资料。稍微改了一下。

    参考资料:https://blog.csdn.net/qq_38553333/article/details/82833273

    首先是redisConfig。

    import com.fasterxml.jackson.annotation.JsonAutoDetect;
    import com.fasterxml.jackson.annotation.PropertyAccessor;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.springframework.cache.annotation.EnableCaching;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.core.*;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    
    
    @Configuration
    @EnableCaching //开启注解
    public class RedisConfig {
        /**
         * retemplate相关配置
         * @param factory
         * @return
         */
        @Bean
        public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
    
            RedisTemplate<String, Object> template = new RedisTemplate<>();
            // 配置连接工厂
            template.setConnectionFactory(factory);
    
            //使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式)
            Jackson2JsonRedisSerializer jacksonSeial = new Jackson2JsonRedisSerializer(Object.class);
    
            ObjectMapper om = new ObjectMapper();
            // 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public
            om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            // 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常
            om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            jacksonSeial.setObjectMapper(om);
    
            // 值采用json序列化
            template.setValueSerializer(jacksonSeial);
            //使用StringRedisSerializer来序列化和反序列化redis的key值
            template.setKeySerializer(new StringRedisSerializer());
    
            // 设置hash key 和value序列化模式
            template.setHashKeySerializer(new StringRedisSerializer());
            template.setHashValueSerializer(jacksonSeial);
            template.afterPropertiesSet();
    
            return template;
        }
    
    
        @Bean
        public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(redisConnectionFactory);
            return container;
        }
    }
    

      

    消息实体Message

    import com.alibaba.fastjson.JSON;
    import lombok.Data;
    
    import java.util.UUID;
    
    @Data
    public class Message {
        private String id;
        private Integer retryCount;
        private String content;
        private Integer status;
        private String topic;
    
        public Message() {
        }
    
        public Message(String topic, Object object) {
            this.id = UUID.randomUUID().toString().replace("-", "");
            this.retryCount = 0;
            this.content = JSON.toJSONString(object);
            this.status = 0;
            this.topic = topic;
        }
    }
    

      

    Redis订阅管理,采用观察者模式。

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Component;
    
    import java.util.HashMap;
    import java.util.HashSet;
    import java.util.Map;
    import java.util.Set;
    
    @Component
    public class TopicSubscriber {
        private final Map<String, Set<String>> subscriberMap = new HashMap();
    
        @Autowired
        private RedisTemplate<String, Object> redisTemplate;
      // 观察者模式实现消费者注册。
        public Boolean addConsumer(String topic, String consumer) {
            Set<String> consumerList = subscriberMap.get(topic);
    
            if (consumerList == null) {
                consumerList = new HashSet<>();
            }
            Boolean b = consumerList.add(consumer);
            subscriberMap.put(topic, consumerList);
            return b;
        }
    
        public Boolean removeConsumer(String topic, String comsumer) {
            Set<String> consumerList = subscriberMap.get(topic);
            Boolean b = false;
            if (consumerList != null) {
                b = consumerList.remove(comsumer);
                subscriberMap.put(topic, consumerList);
            }
            return b;
        }
      //消息广播
        public void broadcast(String topic, String id) {
            if (subscriberMap.get(topic) != null) {
                for (String consumer : subscriberMap.get(topic)) {
                    String key = String.join("_", topic, consumer, id);
                    if (!redisTemplate.hasKey("fail_" + key)) {
                        redisTemplate.opsForValue().set(key, id);
                        redisTemplate.opsForList().leftPush(topic + "_" + consumer, topic);
                    }
                }
            }
    
        }
    }
    

      

    然后是Redis发布者

    import com.alibaba.fastjson.JSON;
    import com.redis.mq.subscriber.TopicSubscriber;
    import io.netty.util.CharsetUtil;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    
    @Component
    public class RedisPublisher {
    
        @Autowired
        private RedisTemplate<String, Object> redisTemplate;
    
        @Autowired
        TopicSubscriber subscriber;
    
        @PostConstruct
        public void init() throws Exception {
            // todo test thread
            /*new Thread(() -> {
                int count = 0;
                try {
                    Thread.sleep(3000l);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                while (count < 14) {
                    try {
                        Thread.sleep(100l);
                        Generate generate = new Generate();
                        generate.setIdNo("" + count);
                        this.publish("GenerateLog", generate);
                        count++;
                    } catch (Exception e) {
                    }
                }
            }).start();*/
        }
      
        public void publish(String topic, Object content) {  //消息发布到redis
            Message message = new Message(topic, content);
            subscriber.broadcast(topic, message.getId());
            redisTemplate.getConnectionFactory().getConnection().publish(
                    topic.getBytes(CharsetUtil.UTF_8), JSON.toJSONString(message).getBytes()
            );
        }
    }
    

      

    Redis消费者。实现MessageListener的onMessage就可以。为了易于扩展,这里使用了泛型。

    import com.alibaba.fastjson.JSON;
    import com.cache.redis.mq.subscriber.TopicSubscriber;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.connection.MessageListener;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    
    import java.lang.reflect.ParameterizedType;
    import java.util.concurrent.TimeUnit;
    
    public abstract class RedisListener<T> implements MessageListener {
    
        @Autowired
        protected RedisTemplate<String, Object> redisTemplate;
    
        @Autowired
        protected RedisMessageListenerContainer messageListenerContainer;
    
        @Autowired
        protected TopicSubscriber subscriber;
    
        @Override
        public void onMessage(org.springframework.data.redis.connection.Message message, byte[] bytes) {
            String name = this.getClass().getSimpleName();
            String topic = new String(message.getChannel());
            String content = new String(message.getBody());
            Message m = JSON.parseObject(content, Message.class);
            String key = String.join("_", topic, name, m.getId());
    
            Object b = redisTemplate.opsForList().rightPop(topic + "_" + name);
            if (b != null && b.equals(m.getTopic())) {
                T t = JSON.parseObject(m.getContent(),
                        ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0]);
                handler(t);  // 处理redis消息。
                // set data expire.使用redis的expire接口直接丢弃消费过的数据。
                redisTemplate.expire(key, 1, TimeUnit.NANOSECONDS);
            } else {
                // todo retry
                redisTemplate.opsForValue().set("fail_" + key, content);
            }
        }
    
        protected abstract void handler(T t);
    }
    

    到这里,基础的redisMq就差不多了。下面涉及具体的业务及批量插入。

     首先,加一个logHander接口。

    public interface LogHandler {
        void process();
    }
    

    写一个抽象类继承RedisListener并且实现LogHander。这里用到了redis的put和poll阻塞队列。

    因为使用了mybatisplus又不想重新写mybatis foreach批量查询语句。所以这里偷懒直接用mybatis的sqlsession的单条预编译,批量commit。

    import com.cache.redis.mq.RedisListener;
    import com.server.log.store.LogStore;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.ibatis.session.ExecutorType;
    import org.apache.ibatis.session.SqlSession;
    import org.apache.ibatis.session.SqlSessionFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.listener.ChannelTopic;
    
    import javax.annotation.PostConstruct;
    import java.lang.reflect.ParameterizedType;
    import java.util.List;
    
    @Slf4j
    public abstract class AbstractLogHandler<T, M> extends RedisListener<T> implements LogHandler {
    
        @Autowired
        SqlSessionFactory factory;
    
        @PostConstruct
        public void addListener() {
            messageListenerContainer.addMessageListener(this, new ChannelTopic(getTopic()));
            subscriber.addConsumer(getTopic(), this.getClass().getSimpleName());
            process();
        }
    
        @Override
        protected void handler(T t) {
            getStore().put(t);  //阻塞直到能新写入。这里其实可以加个超时时间。避免一直阻塞。
        }
    
        protected abstract String getTopic();
    
        protected abstract LogStore<T> getStore();
    
        protected void commit(List<T> data) {
            if (data == null || data.isEmpty()) return;
            SqlSession session = factory.openSession(ExecutorType.BATCH);
            try {
                M mapper = session.getMapper(
                        (Class<M>) (((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[1])
                );
                save(data, mapper);
                session.commit();
            } catch (Exception e) {
                log.error(String.format("topic %s 数据批量写入失败。{}", getTopic()), e);
                session.rollback();
            }finally {
                session.close();
            }
    
            data.forEach(o -> o = null);
            data.clear();
        }
    
        protected abstract void save(List<T> data, M m);
    }
    

      

    LogStore阻塞队列

    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    @Slf4j
    public class LogStore<T> {
    
        private static final Integer QUEUE_CAPACITY = 10000;
    
        private BlockingQueue<T> logQueue;
    
        public LogStore() {
            this(QUEUE_CAPACITY);
        }
    
        public LogStore(int capacity) {
            this.logQueue = new LinkedBlockingQueue<>(capacity);
        }
    
        public void put(T t) {
            try {
                logQueue.put(t);
            } catch (InterruptedException e) {
                log.info("logStore put exception:{}", e);
            }
        }
    
        public T poll(long seconds) {
            try {
                return logQueue.poll(seconds, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                return null;
            }
        }
    }
    

      

     到这里,基础的业务代码就写的差不多了。然后我们看下具体的业务处理类怎么写。

    比如我们的注册日志,只要实现抽象类AbstraceLogHandler就可以了

    import comcommon.constant.Constant;
    import com.common.po.RegLog;
    import com.dao.mapper.RegLogMapper;
    import com.server.log.store.LogStore;
    import org.springframework.stereotype.Component;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    @Component
    public class RegisterLogHandler extends AbstractLogHandler<RegLog, RegLogMapper> {
        private final LogStore<RegLog> store = new LogStore<>();
        private String topic = Constant.TOPIC_REGISTER_LOG;
        // todo 可配置
        private final Integer batchSize = 300;
        private final Integer waitSeconds = 2;
    
        ExecutorService executor = Executors.newSingleThreadExecutor();
    
        @Override
        protected String getTopic() {
            return this.topic;
        }
    
        @Override
        protected LogStore<RegLog> getStore() {
            return this.store;
        }
    
        @Override
        public void process() {
            executor.execute(() -> {    //开启线程从redis中poll数据。
                List<RegLog> data = new ArrayList<>(batchSize);
                while (true) {
                    RegLog generate = this.store.poll(waitSeconds);
                    if (generate != null) {
                        if (data.size() >= batchSize) {
                            commit(data);
                        }
    
                        data.add(generate);
                    } else {  //处理不足batchSize的尾巴数据。
                        if (data.size() > 0) {
                            commit(data);
                        }
                    }
                }
            });
        }
    
        @Override
        protected void save(List<RegLog> data, RegLogMapper mapper) {
            data.forEach(o -> {
                if (o.getRegNo() == null) {
                    String genNo = UUID.randomUUID().toString();
                    o.setRegNo(genNo);
                }
                mapper.insert(o);  //因为不想写mybatis的foreach语句。所以这里直接用mybatisplus的insert单条语句。到这里sqlssesion并没有commit.
            });
        }
    }
    

      

    调用:

    @Autowired
        protected RedisPublisher publisher;
    
    
    publisher.publish(Constant.TOPIC_REGISTER_LOG, log);
    

      

  • 相关阅读:
    区块链技术术语表
    以太坊客户端Geth命令用法-参数详解
    智能合约开发环境搭建及Hello World合约
    以太坊是什么
    比特币区块结构Merkle树及简单支付验证分析
    非对称加密技术- RSA算法数学原理分析
    验证APNS证书的有效性
    十八般武艺之 Runloop
    iOS 所有设备一览 && CoreFoundation源码
    [User Defaults] Failed to read values in CFPrefsPlistSource (iOS 10)
  • 原文地址:https://www.cnblogs.com/braska/p/12936937.html
Copyright © 2011-2022 走看看