需求背景
当前有个需求,需要将一份过滤出来的数据文件,按照一定的格式导入redis中,之后做数据资源池使用。由于文件数据比较大,有1000w行左右。所以使用redis的pipeline管道去分批写入redis
什么是Pipeline?
首先先来介绍一下pipeline:
Pipeline指的是管道技术,指的是客户端允许将多个请求依次发给服务器,过程中而不需要等待请求的回复,在最后再一并读取结果即可。
下面借鉴一下别人的图来说明一下为什么pipeline速度会很快。
说白了,普通请求过程就是一次一次去redis-server端,而当client发送请求之后就会阻塞,并等待server响应之后再去处理下一次请求,当数据量大,且网络波动明显时,耗时便会非常严重。
而pipeline管道则是将多次请求一次性发给server端,server端将多条命令执行完毕,一次性返回,大大减少了多次往返的网络消耗。
![图片名称](https://img2020.cnblogs.com/blog/1417997/202004/1417997-20200428165750107-1833573983.png)
![图片名称](https://img2020.cnblogs.com/blog/1417997/202004/1417997-20200428165814788-97515272.png)
redis准备工作
引入redis依赖
这里我用的是springboot
<!--springboot redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--lettuce连接池需要此依赖-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.6.0</version>
</dependency>
配置文件配置redis参数以及连接设置
这里使用的是lettuce连接池
springboot对连接池的使用非常智能,配置文件中添加lettuce.pool相关配置,则会使用到lettuce连接池,并将相关配置设置为连接池相关参数,这里需要用到上边的commons-pool2依赖
spring:
redis:
lettuce:
pool:
MaxTotal: 50
minIdle: 1
maxWaitMillis: 5000
maxIdle: 5
testOnBorrow: true
testOnReturn: true
testWhileIdle: true
mac-resource:
database: 19
hostName: r-uf6rnu5b4xxxxxxxapd.redis.rds.aliyuncs.com
port: 6379
timeout: 5000
password: kxohxxxxxxxxyL
配置RedisTemplate
package com.hao.redistest.redistemplatetest.config;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.time.Duration;
/**
* @author hao
* @date 2020/4/27
*/
@Configuration
public class RedisConfig {
//redis连接池参数设置
@Bean
@ConfigurationProperties(prefix = "spring.redis.lettuce.pool")
public GenericObjectPoolConfig redisPoolConfig() {
return new GenericObjectPoolConfig();
}
//根据配置文件的mac-resource读取 reids资源配置
@Bean
@ConfigurationProperties(prefix = "spring.redis.mac-resource")
public RedisStandaloneConfiguration macResourceConfiguration() {
return new RedisStandaloneConfiguration();
}
//使用lettuceConnectionFactory连接redis
@Bean
public LettuceConnectionFactory macResourceFactory() {
GenericObjectPoolConfig redisPoolConfig = redisPoolConfig();
LettuceClientConfiguration clientConfiguration = LettucePoolingClientConfiguration.builder()
.poolConfig(redisPoolConfig).commandTimeout(Duration.ofMillis(redisPoolConfig.getMaxWaitMillis())).build();
return new LettuceConnectionFactory(macResourceConfiguration(), clientConfiguration);
}
//配置RedisTemplate
@Bean("macResourceRedisTemplate")
public RedisTemplate<String,String> macResourceRedisTemplate() {
LettuceConnectionFactory macResourcePoolFactory = macResourceFactory();
RedisTemplate<String,String> redisTemplate = new RedisTemplate<>();
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setHashKeySerializer(stringRedisSerializer);
redisTemplate.setValueSerializer(stringRedisSerializer);
redisTemplate.setConnectionFactory(macResourcePoolFactory);
return redisTemplate;
}
}
pipeline使用
batchSave方法为pipeline写入redis方法
@Service
public class TestServiceImpl implements TestService {
@Resource(name = "macResourceRedisTemplate")
private RedisTemplate<String, String> macResourceRedisTemplate;
/**
* 读取文件并写入redis
* @param path 要写入的文件
*/
@Override
public void writeToRedis(String path) {
try {
//通过参数path读取要写入的文件
FileReader fr = new FileReader(path);
BufferedReader br = new BufferedReader(fr);
String line = "";
String[] resourceArr = null;
AtomicInteger inc = new AtomicInteger();
//分批执行
int batchSize = 20000;
List<Map<String,String>> batch = new ArrayList<>(batchSize);
while ((line = br.readLine()) != null) {
//这里是我读取写入的规则,大家可以按自己的规则来
resourceArr = line.split(",");
String key = resourceArr[0] + "_" + resourceArr[4];
Map<String,Object> element = new HashMap<>(1);
element.put("param1", resourceArr[1]);
element.put("param2", resourceArr[2]);
element.put("param3", resourceArr[7]);
Map<String,String> kv = new HashMap<>();
kv.put(key, JSON.toJSONString(element));
batch.add(kv) ;
if(batch.size() % batchSize == 0 ){
List<Map<String,String>> toSaveBatch = new ArrayList<>(batch);
try{
//到达设定的batchSize进行pipeline写入
batchSave(toSaveBatch,inc);
batch = new ArrayList<>(batchSize);
}catch (Exception ex ){
for(Map<String,String> m :toSaveBatch){
for(Map.Entry<String,String> entry : m.entrySet()){
FileUtils.writeStringToFile(new File("tmp/mac_error.txt"),entry.getKey() + "@" + entry.getValue() + "
", StandardCharsets.UTF_8,true);
}
}
throw new RuntimeException(ex);
}
}
}
br.close();
fr.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 分批用pipeline写入redis
* @param batch
* @param inc
*/
private void batchSave(List<Map<String,String>> batch,AtomicInteger inc ){
//调用redisTemplate的executePipelined 重新内部的doInRedis方法,这里用lambda语法写的 隐藏掉了
macResourceRedisTemplate.executePipelined((RedisCallback<Object>) redisConnection -> {
//打开pipeline管道
redisConnection.openPipeline();
for(Map<String,String> e : batch){
for(Map.Entry<String,String> entry : e.entrySet() ){
try {
//遍历集合数据,通过pipeline推入redis
redisConnection.lPush(entry.getKey().getBytes(),entry.getValue().getBytes());
}catch (Exception ex){
System.out.println("key:" + entry.getKey() + ",value: " + entry.getValue());
throw new RuntimeException(ex);
}
System.out.println(inc.incrementAndGet());
}
}
return null;
});
}
}
启动项目访问入口
http://localhost:9999/api/write-redis?path=/Users/hao/Desktop/xxxxx.txt
10秒左右已经写了20多w
![图片名称](https://img2020.cnblogs.com/blog/1417997/202004/1417997-20200428165956894-27978129.png)
再看看redis中,已经有了写入数据
![图片名称](https://img2020.cnblogs.com/blog/1417997/202004/1417997-20200428165928251-1030736625.png)
项目地址
github: redis-pipeline-demo
参考内容
https://www.cnblogs.com/littleatp/p/8419796.html
https://blog.csdn.net/ouyang111222/article/details/50942893