zoukankan      html  css  js  c++  java
  • 记一次使用RedisTeamplate 操作Pipeline

    需求背景

    当前有个需求,需要将一份过滤出来的数据文件,按照一定的格式导入redis中,之后做数据资源池使用。由于文件数据比较大,有1000w行左右。所以使用redis的pipeline管道去分批写入redis

    什么是Pipeline?

    首先先来介绍一下pipeline:
    Pipeline指的是管道技术,指的是客户端允许将多个请求依次发给服务器,过程中而不需要等待请求的回复,在最后再一并读取结果即可。


    下面借鉴一下别人的图来说明一下为什么pipeline速度会很快。

    说白了,普通请求过程就是一次一次去redis-server端,而当client发送请求之后就会阻塞,并等待server响应之后再去处理下一次请求,当数据量大,且网络波动明显时,耗时便会非常严重。

    而pipeline管道则是将多次请求一次性发给server端,server端将多条命令执行完毕,一次性返回,大大减少了多次往返的网络消耗。

    图片名称 图片名称

    redis准备工作

    引入redis依赖

    这里我用的是springboot

     <!--springboot redis-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <!--lettuce连接池需要此依赖-->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.6.0</version>
        </dependency>
    

    配置文件配置redis参数以及连接设置

    这里使用的是lettuce连接池
    springboot对连接池的使用非常智能,配置文件中添加lettuce.pool相关配置,则会使用到lettuce连接池,并将相关配置设置为连接池相关参数,这里需要用到上边的commons-pool2依赖

    spring:
    redis:
      lettuce:
        pool:
          MaxTotal: 50
          minIdle: 1
          maxWaitMillis: 5000
          maxIdle: 5
          testOnBorrow: true
          testOnReturn: true
          testWhileIdle: true
      mac-resource:
        database: 19
        hostName: r-uf6rnu5b4xxxxxxxapd.redis.rds.aliyuncs.com
        port: 6379
        timeout: 5000
        password: kxohxxxxxxxxyL
    

    配置RedisTemplate

    package com.hao.redistest.redistemplatetest.config;
    
    import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
    import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
    import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
    import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    
    import java.time.Duration;
    
    /**
     * @author hao
     * @date 2020/4/27
     */
    @Configuration
    public class RedisConfig {
    
        //redis连接池参数设置
        @Bean
        @ConfigurationProperties(prefix = "spring.redis.lettuce.pool")
        public GenericObjectPoolConfig redisPoolConfig() {
            return new GenericObjectPoolConfig();
        }
    
        //根据配置文件的mac-resource读取 reids资源配置
        @Bean
        @ConfigurationProperties(prefix = "spring.redis.mac-resource")
        public RedisStandaloneConfiguration macResourceConfiguration() {
            return new RedisStandaloneConfiguration();
        }
    
        //使用lettuceConnectionFactory连接redis
        @Bean
        public LettuceConnectionFactory macResourceFactory() {
            GenericObjectPoolConfig redisPoolConfig = redisPoolConfig();
            LettuceClientConfiguration clientConfiguration = LettucePoolingClientConfiguration.builder()
                    .poolConfig(redisPoolConfig).commandTimeout(Duration.ofMillis(redisPoolConfig.getMaxWaitMillis())).build();
            return new LettuceConnectionFactory(macResourceConfiguration(), clientConfiguration);
        }
    
        //配置RedisTemplate
        @Bean("macResourceRedisTemplate")
        public RedisTemplate<String,String> macResourceRedisTemplate() {
            LettuceConnectionFactory macResourcePoolFactory = macResourceFactory();
            RedisTemplate<String,String> redisTemplate = new RedisTemplate<>();
            StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
            redisTemplate.setKeySerializer(stringRedisSerializer);
            redisTemplate.setHashKeySerializer(stringRedisSerializer);
            redisTemplate.setValueSerializer(stringRedisSerializer);
            redisTemplate.setConnectionFactory(macResourcePoolFactory);
            return redisTemplate;
        }
    }
    

    pipeline使用

    batchSave方法为pipeline写入redis方法

    @Service
    public class TestServiceImpl implements TestService {
    
        @Resource(name = "macResourceRedisTemplate")
        private RedisTemplate<String, String> macResourceRedisTemplate;
    
        /**
         * 读取文件并写入redis
         * @param path 要写入的文件
         */
        @Override
        public void writeToRedis(String path) {
            try {
                //通过参数path读取要写入的文件
                FileReader fr = new FileReader(path);
                BufferedReader br = new BufferedReader(fr);
                String line = "";
                String[] resourceArr = null;
                AtomicInteger inc = new AtomicInteger();
                //分批执行
                int batchSize = 20000;
                List<Map<String,String>> batch = new ArrayList<>(batchSize);
                while ((line = br.readLine()) != null) {
                    //这里是我读取写入的规则,大家可以按自己的规则来
                    resourceArr = line.split(",");
                    String key = resourceArr[0] + "_" + resourceArr[4];
                    Map<String,Object> element = new HashMap<>(1);
                    element.put("param1", resourceArr[1]);
                    element.put("param2", resourceArr[2]);
                    element.put("param3", resourceArr[7]);
                    Map<String,String> kv = new HashMap<>();
                    kv.put(key, JSON.toJSONString(element));
                    batch.add(kv)   ;
                    if(batch.size() % batchSize == 0 ){
                        List<Map<String,String>> toSaveBatch = new ArrayList<>(batch);
                        try{
                            //到达设定的batchSize进行pipeline写入
                            batchSave(toSaveBatch,inc);
                            batch = new ArrayList<>(batchSize);
                        }catch (Exception ex ){
                            for(Map<String,String> m :toSaveBatch){
                                for(Map.Entry<String,String> entry : m.entrySet()){
                                    FileUtils.writeStringToFile(new File("tmp/mac_error.txt"),entry.getKey() + "@" + entry.getValue() + "
    ", StandardCharsets.UTF_8,true);
                                }
                            }
                            throw new RuntimeException(ex);
                        }
                    }
                }
                br.close();
                fr.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 分批用pipeline写入redis
         * @param batch
         * @param inc
         */
        private void batchSave(List<Map<String,String>> batch,AtomicInteger inc ){
            //调用redisTemplate的executePipelined  重新内部的doInRedis方法,这里用lambda语法写的 隐藏掉了
            macResourceRedisTemplate.executePipelined((RedisCallback<Object>) redisConnection -> {
                //打开pipeline管道
                redisConnection.openPipeline();
                for(Map<String,String> e : batch){
                    for(Map.Entry<String,String> entry : e.entrySet() ){
                        try {
                            //遍历集合数据,通过pipeline推入redis
                            redisConnection.lPush(entry.getKey().getBytes(),entry.getValue().getBytes());
                        }catch (Exception ex){
                            System.out.println("key:" + entry.getKey() + ",value: " + entry.getValue());
                            throw new RuntimeException(ex);
                        }
                        System.out.println(inc.incrementAndGet());
                    }
                }
                return null;
            });
        }
    }
    
    

    启动项目访问入口

    http://localhost:9999/api/write-redis?path=/Users/hao/Desktop/xxxxx.txt

    10秒左右已经写了20多w

    图片名称

    再看看redis中,已经有了写入数据

    图片名称

    项目地址

    github: redis-pipeline-demo

    参考内容

    https://www.cnblogs.com/littleatp/p/8419796.html
    https://blog.csdn.net/ouyang111222/article/details/50942893

  • 相关阅读:
    Java查找指定文件中指定字符的个数
    推荐系统(CTR领域)实战入门指南
    xgboost 实践
    pandas 获取列名
    pandas 标签映射成数值的几种方法
    pandas 删除列
    pandas 聚合求和等操作
    dataframe检查重复值,去重
    linux 解压缩文件(tar和zip)
    kaggle——Bag of Words Meets Bags of Popcorn(IMDB电影评论情感分类实践)
  • 原文地址:https://www.cnblogs.com/zhushenghao/p/12795411.html
Copyright © 2011-2022 走看看