最近做一个统计项目,数据量非常大,之前使用scan命令对redis中指定key进行扫描,一次100条,执行稳定、效率低,同时tcp关闭连接的time-wait增速相当的快,对性能造成了极大的浪费同时执行时间也很慢,而且当数据量进一步增大可能会影响其他服务。为了减少tcp连接数量,将redis的scan修改为pipeLine操作。
一、为什么要使用Pipeline?
Redis是采用基于C/S模式的请求/响应协议的TCP服务器。
性能问题一:redis客户端发送多条请求,后面的请求需要等待前面的请求处理完后,才能进行处理,而且每个请求都存在往返时间RRT(Round Trip Time),即使redis性能极高,当数据量足够大,也会极大影响性能,还可能会引起其他意外情况。
性能问题二:性能问题一,我们可以通过scan命令来解决,如何来设置count又是一个问题,设置不好,同样会有大量请求存在,即使设置到1w(推荐最大值),如果扫描的数据量太大,这个问题同样不能避免。每个请求都会经历三次握手、四次挥手,在处理大量连接时,处理完后,挥手会产生大量time-wait,如果该服务器提供其他服务,可能对其他服务造成影响。
使用Pipeline可以解决以上问题。
二、如何在使用Pipeline?
本文使用的是RedisTemplate调用execute,connection使用的redis原生的操作,这里简单使用了zCount来计算keys中集合数据的长度。获取结果为result,这里要使用Pipeline需要调用Connection.openPipeline()。Connection.closePipeline()返回值为List<Object>是执行后的结果,相当简单。
redisTemplate.execute(new RedisCallback<Long>() {
@Nullable
@Override
public Long doInRedis(RedisConnection connection) throws DataAccessException {
connection.openPipeline();
for (int i = 0; i < 1000000; i++) {
String key = "123" + i;
connection.zCount(key.getBytes(), 0,Integer.MAX_VALUE);
}
List<Object> result=connection.closePipeline();
return null;
}
});
这个list是放在匿名类内部,对于数据处理不太友好,代码会看起来相当难受,想取出来使用还是不可变的。如果要获取返回值,我们可以调用如下代码executePipelined,这样就可以返回我们需要的结果,下面我们可以对得到list进行操作。
List<Long> List = redisTemplate.executePipelined(new RedisCallback<Long>() {
@Nullable
@Override
public Long doInRedis(RedisConnection connection) throws DataAccessException {
connection.openPipeline();
for (int i = 0; i < 1000000; i++) {
String key = "123" + i;
connection.zCount(key.getBytes(), 0,Integer.MAX_VALUE);
}
return null;
}
});
在这里需要注意4点内容:
1.这里的connect是redis原生链接,所以connection的返回结果是基本上是byte数组,如果需要存储的数据,需要对byte[]数组反序列化。
2.在doInRedis中返回值必须返回为null,为什么返回为空?可以定位到内部代码去查看详情,这里不再赘述。3.connection.openPipeline()可以调用,也可以不调用,但是connection.closePipeline()不能调用,调用了拿不到返回值。因为调用的时候会直接将结果返回,同时也不会对代码进行反序列化。
4.反序列化需要传入反序列化对象,这些对象都可以进行相应的实例化,如下图所示。
根据你的项目需求选择合适的反序列化对象。比如我在项目中key使用的是StringRedisSerializer,而值通常使用的是GenerJackson2JsonRedisSerializer。所以在初始化redisTemplate的时候会这样做,代码如下,将序列化的实例化对象放入redisTemplate中,当使用的时候就可以直接redis.getKeySerializer()或者redis.getValueSerializer(),这样就不用在实例化一个对象,造成浪费和冗余。
public class MyRedisUtil {
private RedisTemplate redisTemplate;
@Autowired
public void setRedisTemplate(RedisTemplate redisTemplate) {
RedisSerializer keySerializer = new StringRedisSerializer();
RedisSerializer valueSerializer = new GenericJackson2JsonRedisSerializer();
redisTemplate.setKeySerializer(keySerializer);
redisTemplate.setValueSerializer(valueSerializer);
this.redisTemplate = redisTemplate;
}
public RedisTemplate getRedisTemplate() {
return redisTemplate;
}
}
通过这样的掉用方式,我们就可以不用进行强制转换,直接获得我们想要的对象了。
List<User> List = redisTemplate.executePipelined(new RedisCallback<User>() {
@Nullable
@Override
public User doInRedis(RedisConnection connection) throws DataAccessException {
connection.openPipeline();
for (int i = 0; i < 1000000; i++) {
String key = "123" + i;
connection.zCount(key.getBytes(), 0,Integer.MAX_VALUE);
}
return null;
}
}, myRedisComponent.getRedisTemplate().getValueSerializer());