早期由于生产环境业务量小。所以日志是一条一条commit的。运行也没出过问题。
后来随着业务扩大并发量上来后,日志写入因为频繁与数据库打交道导致数据库连接池经常占满,直至程序崩溃。
因为日志并非需要实时响应。所以考虑改用异步+批量提交的方式。
为了缓解jvm内存压力。采用redis做消息队列(因为原项目有集成过redis,公司不想使用其他mq增加维护成本)。
所以在网上找了篇springboot整合redistemplate做消息队列的资料。稍微改了一下。
参考资料:https://blog.csdn.net/qq_38553333/article/details/82833273
首先是redisConfig。
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.*;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
@EnableCaching //开启注解
public class RedisConfig {
/**
* retemplate相关配置
* @param factory
* @return
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
// 配置连接工厂
template.setConnectionFactory(factory);
//使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式)
Jackson2JsonRedisSerializer jacksonSeial = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
// 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jacksonSeial.setObjectMapper(om);
// 值采用json序列化
template.setValueSerializer(jacksonSeial);
//使用StringRedisSerializer来序列化和反序列化redis的key值
template.setKeySerializer(new StringRedisSerializer());
// 设置hash key 和value序列化模式
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(jacksonSeial);
template.afterPropertiesSet();
return template;
}
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
return container;
}
}
消息实体Message
import com.alibaba.fastjson.JSON;
import lombok.Data;
import java.util.UUID;
@Data
public class Message {
private String id;
private Integer retryCount;
private String content;
private Integer status;
private String topic;
public Message() {
}
public Message(String topic, Object object) {
this.id = UUID.randomUUID().toString().replace("-", "");
this.retryCount = 0;
this.content = JSON.toJSONString(object);
this.status = 0;
this.topic = topic;
}
}
Redis订阅管理,采用观察者模式。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@Component
public class TopicSubscriber {
private final Map<String, Set<String>> subscriberMap = new HashMap();
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// 观察者模式实现消费者注册。
public Boolean addConsumer(String topic, String consumer) {
Set<String> consumerList = subscriberMap.get(topic);
if (consumerList == null) {
consumerList = new HashSet<>();
}
Boolean b = consumerList.add(consumer);
subscriberMap.put(topic, consumerList);
return b;
}
public Boolean removeConsumer(String topic, String comsumer) {
Set<String> consumerList = subscriberMap.get(topic);
Boolean b = false;
if (consumerList != null) {
b = consumerList.remove(comsumer);
subscriberMap.put(topic, consumerList);
}
return b;
}
//消息广播
public void broadcast(String topic, String id) {
if (subscriberMap.get(topic) != null) {
for (String consumer : subscriberMap.get(topic)) {
String key = String.join("_", topic, consumer, id);
if (!redisTemplate.hasKey("fail_" + key)) {
redisTemplate.opsForValue().set(key, id);
redisTemplate.opsForList().leftPush(topic + "_" + consumer, topic);
}
}
}
}
}
然后是Redis发布者
import com.alibaba.fastjson.JSON;
import com.redis.mq.subscriber.TopicSubscriber;
import io.netty.util.CharsetUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class RedisPublisher {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
TopicSubscriber subscriber;
@PostConstruct
public void init() throws Exception {
// todo test thread
/*new Thread(() -> {
int count = 0;
try {
Thread.sleep(3000l);
} catch (InterruptedException e) {
e.printStackTrace();
}
while (count < 14) {
try {
Thread.sleep(100l);
Generate generate = new Generate();
generate.setIdNo("" + count);
this.publish("GenerateLog", generate);
count++;
} catch (Exception e) {
}
}
}).start();*/
}
public void publish(String topic, Object content) { //消息发布到redis
Message message = new Message(topic, content);
subscriber.broadcast(topic, message.getId());
redisTemplate.getConnectionFactory().getConnection().publish(
topic.getBytes(CharsetUtil.UTF_8), JSON.toJSONString(message).getBytes()
);
}
}
Redis消费者。实现MessageListener的onMessage就可以。为了易于扩展,这里使用了泛型。
import com.alibaba.fastjson.JSON;
import com.cache.redis.mq.subscriber.TopicSubscriber;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import java.lang.reflect.ParameterizedType;
import java.util.concurrent.TimeUnit;
public abstract class RedisListener<T> implements MessageListener {
@Autowired
protected RedisTemplate<String, Object> redisTemplate;
@Autowired
protected RedisMessageListenerContainer messageListenerContainer;
@Autowired
protected TopicSubscriber subscriber;
@Override
public void onMessage(org.springframework.data.redis.connection.Message message, byte[] bytes) {
String name = this.getClass().getSimpleName();
String topic = new String(message.getChannel());
String content = new String(message.getBody());
Message m = JSON.parseObject(content, Message.class);
String key = String.join("_", topic, name, m.getId());
Object b = redisTemplate.opsForList().rightPop(topic + "_" + name);
if (b != null && b.equals(m.getTopic())) {
T t = JSON.parseObject(m.getContent(),
((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0]);
handler(t); // 处理redis消息。
// set data expire.使用redis的expire接口直接丢弃消费过的数据。
redisTemplate.expire(key, 1, TimeUnit.NANOSECONDS);
} else {
// todo retry
redisTemplate.opsForValue().set("fail_" + key, content);
}
}
protected abstract void handler(T t);
}
到这里,基础的redisMq就差不多了。下面涉及具体的业务及批量插入。
首先,加一个logHander接口。
public interface LogHandler {
void process();
}
写一个抽象类继承RedisListener并且实现LogHander。这里用到了redis的put和poll阻塞队列。
因为使用了mybatisplus又不想重新写mybatis foreach批量查询语句。所以这里偷懒直接用mybatis的sqlsession的单条预编译,批量commit。
import com.cache.redis.mq.RedisListener;
import com.server.log.store.LogStore;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.listener.ChannelTopic;
import javax.annotation.PostConstruct;
import java.lang.reflect.ParameterizedType;
import java.util.List;
@Slf4j
public abstract class AbstractLogHandler<T, M> extends RedisListener<T> implements LogHandler {
@Autowired
SqlSessionFactory factory;
@PostConstruct
public void addListener() {
messageListenerContainer.addMessageListener(this, new ChannelTopic(getTopic()));
subscriber.addConsumer(getTopic(), this.getClass().getSimpleName());
process();
}
@Override
protected void handler(T t) {
getStore().put(t); //阻塞直到能新写入。这里其实可以加个超时时间。避免一直阻塞。
}
protected abstract String getTopic();
protected abstract LogStore<T> getStore();
protected void commit(List<T> data) {
if (data == null || data.isEmpty()) return;
SqlSession session = factory.openSession(ExecutorType.BATCH);
try {
M mapper = session.getMapper(
(Class<M>) (((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[1])
);
save(data, mapper);
session.commit();
} catch (Exception e) {
log.error(String.format("topic %s 数据批量写入失败。{}", getTopic()), e);
session.rollback();
}finally {
session.close();
}
data.forEach(o -> o = null);
data.clear();
}
protected abstract void save(List<T> data, M m);
}
LogStore阻塞队列
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@Slf4j
public class LogStore<T> {
private static final Integer QUEUE_CAPACITY = 10000;
private BlockingQueue<T> logQueue;
public LogStore() {
this(QUEUE_CAPACITY);
}
public LogStore(int capacity) {
this.logQueue = new LinkedBlockingQueue<>(capacity);
}
public void put(T t) {
try {
logQueue.put(t);
} catch (InterruptedException e) {
log.info("logStore put exception:{}", e);
}
}
public T poll(long seconds) {
try {
return logQueue.poll(seconds, TimeUnit.SECONDS);
} catch (InterruptedException e) {
return null;
}
}
}
到这里,基础的业务代码就写的差不多了。然后我们看下具体的业务处理类怎么写。
比如我们的注册日志,只要实现抽象类AbstraceLogHandler就可以了
import comcommon.constant.Constant;
import com.common.po.RegLog;
import com.dao.mapper.RegLogMapper;
import com.server.log.store.LogStore;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Component
public class RegisterLogHandler extends AbstractLogHandler<RegLog, RegLogMapper> {
private final LogStore<RegLog> store = new LogStore<>();
private String topic = Constant.TOPIC_REGISTER_LOG;
// todo 可配置
private final Integer batchSize = 300;
private final Integer waitSeconds = 2;
ExecutorService executor = Executors.newSingleThreadExecutor();
@Override
protected String getTopic() {
return this.topic;
}
@Override
protected LogStore<RegLog> getStore() {
return this.store;
}
@Override
public void process() {
executor.execute(() -> { //开启线程从redis中poll数据。
List<RegLog> data = new ArrayList<>(batchSize);
while (true) {
RegLog generate = this.store.poll(waitSeconds);
if (generate != null) {
if (data.size() >= batchSize) {
commit(data);
}
data.add(generate);
} else { //处理不足batchSize的尾巴数据。
if (data.size() > 0) {
commit(data);
}
}
}
});
}
@Override
protected void save(List<RegLog> data, RegLogMapper mapper) {
data.forEach(o -> {
if (o.getRegNo() == null) {
String genNo = UUID.randomUUID().toString();
o.setRegNo(genNo);
}
mapper.insert(o); //因为不想写mybatis的foreach语句。所以这里直接用mybatisplus的insert单条语句。到这里sqlssesion并没有commit.
});
}
}
调用:
@Autowired
protected RedisPublisher publisher;
publisher.publish(Constant.TOPIC_REGISTER_LOG, log);