zoukankan      html  css  js  c++  java
  • java读取大文件内容到Elasticsearch分析(手把手教你java处理超大csv文件)


    现在需要快速分析一个2g的csv文件;
    基于掌握的知识,使用java按行读取文件,批量导入数据到Elasticsearch,
    然后利用es强大的聚合能力分析数据,1个小时搞定!

    package com.example.demo;
    
    import com.alibaba.fastjson.JSON;
    import com.example.demo.entity.Entity;
    import org.apache.commons.io.FileUtils;
    import org.apache.commons.io.LineIterator;
    import org.elasticsearch.action.bulk.BulkRequest;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.action.index.IndexResponse;
    import org.elasticsearch.client.RequestOptions;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.elasticsearch.common.xcontent.XContentType;
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.boot.test.context.SpringBootTest;
    
    import java.io.File;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.List;
    import java.util.Objects;
    
    /**
     * 读取大文件
     * csv格式
     *
     * @author lhb
     * @date 2021/11/11
     * @since 1.0.0
     */
    @SpringBootTest
    public class ImportTest {
    
        @Autowired
        @Qualifier("client")
        private RestHighLevelClient restHighLevelClient;
    
        @Test
        void insert() {
         //csv文件2G,63W条数据,十多个字段
            String filePath = "D:\file\20211111.csv";
    
            LineIterator it = null;
            try {
                it = FileUtils.lineIterator(new File(filePath), "UTF-8");
    
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                while (it.hasNext()) {
                    String line = it.nextLine();
                    //System.out.println("line = " + line);
                    //文件是CSV文件,CSV文件中的每一列是用","隔开的,这样就可以得到每一列的元素
                    String[] strArray = line.split(",");
                    //有很长的空格,trim一下
                    String name = strArray[6].trim();
                    String code = strArray[8].trim();
                    String num = strArray[11].trim();
                    System.out.println(code + "==" + num);
    
                    Entity entity = new Entity();
                    entity.setCode(code);
                    if (Objects.equals("xxx", code)) {
                        //跳过表头
                        continue;
                    }
                    entity.setNum(Long.parseLong(num));
                    entity.setName(name);
                    entity.setCreateTime(new Date());
                    String index = "index20211111";
                    singleInsert2(index, entity);
                }
            } finally {
                LineIterator.closeQuietly(it);
            }
        }
    
        @Test
        void batchInsert() {
    
            String filePath = "D:\express\20211111.csv";
    
            LineIterator it = null;
            try {
                it = FileUtils.lineIterator(new File(filePath), "UTF-8");
    
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                int i = 0;
                List<Entity> entities = new ArrayList<>();
    
                while (it.hasNext()) {
                    String line = it.nextLine();
                    //System.out.println("line = " + line);
                    String[] strArray = line.split(",");
                    String code = strArray[6].trim();
                    String name = strArray[8].trim();
                    String num = strArray[11].trim();
                    System.out.println(code + "==" + num);
    
                    if (Objects.equals("xxx", code)) {
                        //跳过表头
                        continue;
                    }
                    Entity entity = new Entity();
                    entity.setCode(code);
                    entity.setName(name);
                    try {
                        entity.setNum(Long.parseLong(num));
                    } catch (NumberFormatException e) {
                        e.printStackTrace();
                        System.out.println("出错的数据" + code + "==" + num);
                    }
                    entity.setCreateTime(new Date());
                    String index = "index20211111";
    
                    //批量插入
                    entities.add(entity);
                    i++;
              //如果最后一次批量插入不足10000条数据,需要再此根据实际条数特殊处理
    if (i % 10000 == 0) { System.out.println("i = " + i); try { batchInsert2(index, entities); } catch (IOException e) { e.printStackTrace(); } //清空已经处理过的list entities.clear(); i = 0; } } } finally { LineIterator.closeQuietly(it); } } /** * 批量速度杠杠的 * * @param index * @param entities * @throws IOException */ public void batchInsert2(String index, List<Entity> entities) throws IOException { BulkRequest bulkRequest = new BulkRequest(index); System.out.println("entities.sz = " + entities.size()); for (Entity org : entities) { IndexRequest request = new IndexRequest(); request.source(JSON.toJSONString(org), XContentType.JSON); bulkRequest.add(request); } restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); } /** * 数据量大,超级慢 * * @param index * @param entity */ public void singleInsert2(String index, Entity entity) { IndexRequest request = new IndexRequest(index); request.source(JSON.toJSONString(entity), XContentType.JSON); try { IndexResponse index1 = restHighLevelClient.index(request, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } } }
    
    
    实体类,需要什么字段自定义
    package com.example.demo.entity;
    
    import lombok.Data;
    
    import java.util.Date;
    
    /**
     * @author lhb
     * @date 2021/11/11
     * @since 1.0.0
     */
    @Data
    public class Entity {
    
        /**
         * 编码
         */
        private String code;
        /**
         * 名字
         */
        private String name;
        /**
         * 数量
         */
        private Long num;
        private Date createTime;
    
    }
    
    
    创建索引映射,然后插入数据:
    PUT express_to_village20211104
    {
      "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 1
      },
      "mappings": {
        "properties": {
          "code": {
            "type": "keyword"
          },
          "name": {
            "type": "keyword"
          },
          "num": {
            "type": "long"
          },
          "createTime": {
            "type": "date"
          }
        }
      }
    }

    开始分析数据:

    GET index20211111/_count
    {}

     

    #返回63w数据

    {
    "count" : 630000,
    "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
    }
    }

    GET index20211111/_search
    {
      "query": {
        "constant_score": {
          "filter": {
            "terms": {
              "code": [
                2222,
                1111,
                3333
              ]
            }
          }
        }
      },
      "size": 1,
      "track_total_hits": true,
      "aggs": {
        "per_code": {
          "terms": {
            "field": "code",
            "size": 200
          },
          "aggs": {
            "num": {
              "sum": {
                "field": "num"
              }
            }
          }
        },
        "sum_num": {
          "sum": {
            "field": "num"
          }
        }
      }
    }
  • 相关阅读:
    【C++FAQ】如何设定小数点后的显示位数
    【C++FAQ】怎么输入一行字符串(可能带空格)
    c++ operator重载的例子
    【C++FAQ】怎么给结构体排序
    【IT面试题007】英语字符串的分词程序
    【C++/C FAQ】如何格式化输出以0填充的定长整数
    nginx的root和alias指令的区别
    linux磁盘满了,各种奇怪错误
    使用nginx搭建http代理服务器
    nginx图片过滤处理模块http_image_filter_module安装配置笔记
  • 原文地址:https://www.cnblogs.com/hbuuid/p/15540804.html
Copyright © 2011-2022 走看看