zoukankan      html  css  js  c++  java
  • Spark2.2+ES6.4.2(三十一):Spark下生成测试数据,并在Spark环境下使用BulkProcessor将测试数据入库到ES

    Spark下生成2000w测试数据(每条记录150列)

    使用spark生成大量数据过程中遇到问题,如果sc.parallelize(fukeData, 64);的记录数特别大比如500w,1000w时,会特别慢,而且会抛出内存溢出over head错误。解决方案,一次生成的数据量不高于100w,多次调用,这样下来一共生成2000w耗时十几分钟。

    如果环境允许你可以在本地生成测试数据,然后上传到hdfs供spark测试。

    import java.io.BufferedWriter;
    import java.io.FileWriter;
    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.List;
    import java.util.Random;
    
    public class FileGenerate {
        public static void main(String[] args) throws IOException {
            BufferedWriter writer = new BufferedWriter(new FileWriter("d://test.csv"));
    
            List<String> fukeData = new ArrayList<String>();
            for (int i = 1; i <= 20000000; i++) {
                fukeData.add(String.valueOf(i));
            }
    
            List<String> fields = new ArrayList<String>();
    
            fields.add("id");//
            fields.add("object_id"); //
            fields.add("scan_start_time"); //
            fields.add("scan_stop_time");//
            fields.add("insert_time");//
            fields.add("enodeb_id");
            for (int i = 0; i < 145; i++) {
                fields.add("mr_tadv_" + (i < 10 ? "0" + i : i));
            }
            writer.write(String.join(",", fields) + "
    ");
    
            // 假设有1w个小区,数据一共200w条记录,那么每个小区有200条记录。
            Random random = new Random();
            for (String id : fukeData) {
                List<String> rowItems = new ArrayList<String>();
                // id
                int intId = Integer.valueOf(id);
                rowItems.add(id);
                if (intId % 100000 == 0) {
                    System.out.println(intId);
                    writer.flush();
                }
                // object_id
                String objectId = String.valueOf((intId % 10000) + 10000 * 256 + 1);
                rowItems.add(objectId);
    
                int hour = random.nextInt(5) + 2;
                int minute = random.nextInt(59) + 1;
                int second_start = random.nextInt(30) + 1;
                int second_stop = second_start + 15;
                String scan_start_time = "2018-10-29 1" + hour + ":" + minute + ":" + second_start;
                String scan_stop_time = "2018-10-29 1" + hour + ":" + minute + ":" + second_stop;
                // scan_start_time
                rowItems.add(scan_start_time);
                // scan_stop_time
                rowItems.add(scan_stop_time);
    
                // insert_time
                rowItems.add(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
    
                // enodeb_id
                rowItems.add(String.valueOf((int) Integer.valueOf(objectId) / 256));
    
                for (int i = 0; i < 145; i++) {
                    rowItems.add(String.valueOf(random.nextInt(100)));
                }
    
                writer.write(String.join(",", rowItems) + "
    ");
            }
    
            writer.flush();
            writer.close();
        }
    }
    View Code

    如下代码是spark2.2.0环境下生成2000w测试数据代码:

    public class ESWriterTest extends Driver implements Serializable {
        private static final long serialVersionUID = 1L;
        private ExpressionEncoder<Row> encoder = null;
        private StructType type = null;
        private String hdfdFilePath = "/user/my/streaming/test_es/*";
    public ESWriterTest() {
        }
    
        @Override
        public void run() {
            initSchema();
            generateTestData();    
    
            sparkSession.stop();
        }
    
        private void initSchema() {
            type = DataTypes.createStructType(Arrays.asList(//
                    DataTypes.createStructField("id", DataTypes.StringType, true), //
                    DataTypes.createStructField("object_id", DataTypes.StringType, true), //
                    DataTypes.createStructField("scan_start_time", DataTypes.StringType, true), //
                    DataTypes.createStructField("scan_stop_time", DataTypes.StringType, true), //
                    DataTypes.createStructField("insert_time", DataTypes.StringType, true), //
                    DataTypes.createStructField("enodeb_id", DataTypes.StringType, true)));
            for (int i = 0; i < 145; i++) {
                type = type.add("mr_tadv_" + (i < 10 ? "0" + i : i), DataTypes.StringType);
            }
            encoder = RowEncoder.apply(type);
        }
    
        private void generateTestData() {
            generateData("/user/my/streaming/test_es/1/");
            generateData("/user/my/streaming/test_es/2/");
            generateData("/user/my/streaming/test_es/3/");
            generateData("/user/my/streaming/test_es/4/");
            generateData("/user/my/streaming/test_es/5/");
            generateData("/user/my/streaming/test_es/6/");
            generateData("/user/my/streaming/test_es/7/");
            generateData("/user/my/streaming/test_es/8/");
            generateData("/user/my/streaming/test_es/9/");
            generateData("/user/my/streaming/test_es/10/");
            generateData("/user/my/streaming/test_es/11/");
            generateData("/user/my/streaming/test_es/12/");
            generateData("/user/my/streaming/test_es/13/");
            generateData("/user/my/streaming/test_es/14/");
            generateData("/user/my/streaming/test_es/15/");
            generateData("/user/my/streaming/test_es/16/");
            generateData("/user/my/streaming/test_es/17/");
            generateData("/user/my/streaming/test_es/18/");
            generateData("/user/my/streaming/test_es/19/");
            generateData("/user/my/streaming/test_es/20/");
    
            // 支持的文件格式有:text、csv、json、parquet。
            StructType structType = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("value", DataTypes.StringType, true)));
            Dataset<Row> rows = sparkSession.read().format("text").schema(structType).load(hdfdFilePath);
            rows.printSchema();
            rows.show(10);
        }
    
        private void generateData(String hdfsDataFilePath) {
            List<Row> fukeData = new ArrayList<Row>();
            for (int i = 1; i <= 1000000; i++) {
                fukeData.add(RowFactory.create(String.valueOf(i)));
            }
    
            StructType structType = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("id", DataTypes.StringType, false)));
            JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext());
            JavaRDD<Row> javaRDD = sc.parallelize(fukeData, 64);
            Dataset<Row> fukeDataset = sparkSession.createDataFrame(javaRDD, structType);
    
            Random random = new Random();
            // 假设有1w个小区,数据一共200w条记录,那么每个小区有200条记录。
            Dataset<Row> rows = fukeDataset.mapPartitions(new MapPartitionsFunction<Row, Row>() {
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterator<Row> call(Iterator<Row> idItems) throws Exception {
                    List<Row> newRows = new ArrayList<Row>();
                    while (idItems.hasNext()) {
                        String id = idItems.next().getString(0);
                        List<Object> rowItems = new ArrayList<Object>();
                        // id
                        int intId = Integer.valueOf(id);
                        rowItems.add(id);
    
                        // object_id
                        String objectId = String.valueOf((intId % 10000) + 10000 * 256 + 1);
                        rowItems.add(objectId);
    
                        int hour = random.nextInt(5) + 2;
                        int minute = random.nextInt(59) + 1;
                        int second_start = random.nextInt(30) + 1;
                        int second_stop = second_start + 15;
                        String scan_start_time = "2018-10-29 1" + hour + ":" + minute + ":" + second_start;
                        String scan_stop_time = "2018-10-29 1" + hour + ":" + minute + ":" + second_stop;
                        // scan_start_time
                        rowItems.add(scan_start_time);
                        // scan_stop_time
                        rowItems.add(scan_stop_time);
    
                        // insert_time
                        rowItems.add(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
    
                        // enodeb_id
                        rowItems.add(String.valueOf((int) Integer.valueOf(objectId) / 256));
    
                        for (int i = 0; i < 145; i++) {
                            rowItems.add(String.valueOf(random.nextInt(100)));
                        }
    
                        newRows.add(RowFactory.create(rowItems.toArray()));
                    }
    
                    return newRows.iterator();
                }
            }, encoder);
    
            rows.toJavaRDD().repartition(20).saveAsTextFile(hdfsDataFilePath);
        }
    }

    Spark环境下使用BulkProcessor将测试数据入库到ES6.4.2

     下边是Spark2.2.0环境下,使用BulkProcessor方式插入2000w条记录到ES6.4.2下的测试代码,测试代码调测过程中发现问题:不能再ForeachPartitionFunction的call函数中调用client.close(),和bulkProcessor.close();函数,否则会抛出异常:原因这个client可能是多个executor共用。

        private ExpressionEncoder<Row> encoder = null;
        private StructType type = null;
        private String hdfdFilePath = "/user/my/streaming/test_es/*";
    
        public static void main(String[] args) {
            initSchema();
    
            StructType structType = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("value", DataTypes.StringType, true)));
            Dataset<Row> lines = sparkSession.read().format("text").schema(structType).load(hdfdFilePath);
    
            Dataset<Row> rows = lines.map(new MapFunction<Row, Row>() {
                private static final long serialVersionUID = 1L;
    
                @Override
                public Row call(Row value) throws Exception {
                    List<Object> itemsList = new ArrayList<Object>();
                    String line = value.getAs("value");
                    String[] fields = line.split(",");
    
                    for (String filed : fields) {
                        itemsList.add(filed);
                    }
    
                    return RowFactory.create(itemsList.toArray());
                }
            }, encoder);
    
            rows.show(10);
    
            rows.toJSON().foreachPartition(new EsForeachPartitionFunction());
    
            sparkSession.stop();
        }
    
        private void initSchema() {
            type = DataTypes.createStructType(Arrays.asList(//
                    DataTypes.createStructField("id", DataTypes.StringType, true), //
                    DataTypes.createStructField("object_id", DataTypes.StringType, true), //
                    DataTypes.createStructField("scan_start_time", DataTypes.StringType, true), //
                    DataTypes.createStructField("scan_stop_time", DataTypes.StringType, true), //
                    DataTypes.createStructField("insert_time", DataTypes.StringType, true), //
                    DataTypes.createStructField("enodeb_id", DataTypes.StringType, true)));
            for (int i = 0; i < 145; i++) {
                type = type.add("mr_tadv_" + (i < 10 ? "0" + i : i), DataTypes.StringType);
            }
            encoder = RowEncoder.apply(type);
        }

    EsForeachPartitionFunction.java

    import java.net.InetAddress;
    import java.net.UnknownHostException;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.Map;
    import java.util.concurrent.TimeUnit;
    
    import org.apache.spark.api.java.function.ForeachPartitionFunction;
    import org.elasticsearch.action.bulk.BackoffPolicy;
    import org.elasticsearch.action.bulk.BulkProcessor;
    import org.elasticsearch.action.bulk.BulkRequest;
    import org.elasticsearch.action.bulk.BulkResponse;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.common.settings.Settings;
    import org.elasticsearch.common.transport.TransportAddress;
    import org.elasticsearch.common.unit.ByteSizeUnit;
    import org.elasticsearch.common.unit.ByteSizeValue;
    import org.elasticsearch.common.unit.TimeValue;
    import org.elasticsearch.transport.client.PreBuiltTransportClient;
    
    public class EsForeachPartitionFunction implements ForeachPartitionFunction<String> {
        private static final long serialVersionUID = 1L;
    
        @Override
        public void call(Iterator<String> rows) throws Exception {
            TransportClient client = null;
            BulkProcessor bulkProcessor = null;
            try {
                client = getClient();
                bulkProcessor = getBulkProcessor(client);
            } catch (Exception ex) {
                System.out.println(ex.getMessage() + "
    " + ex.getStackTrace());
            }
            Map<String, Object> mapType = new HashMap<String, Object>();
    
            while (rows.hasNext()) {
                @SuppressWarnings("unchecked")
                Map<String, Object> map = new com.google.gson.Gson().fromJson(rows.next(), mapType.getClass());
                bulkProcessor.add(new IndexRequest("twitter", "tweet").source(map));
            }
    
            try {
                // Flush any remaining requests
                bulkProcessor.flush();
                System.out.println("--------------------------------bulkProcessor.flush(); over...------------------------");
    
            } catch (Exception ex) {
                System.out.println("" + ex.getMessage() + "
    " + ex.getStackTrace());
            }
    
            try {
                // Or close the bulkProcessor if you don't need it anymore
                bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
                System.out.println("--------------------------------bulkProcessor.awaitClose(10, TimeUnit.MINUTES); over...------------------------");
            } catch (Exception ex) {
                System.out.println("" + ex.getMessage() + "
    " + ex.getStackTrace());
            }
        }
    
        private BulkProcessor getBulkProcessor(TransportClient client) {
            BulkProcessor bulkProcessor = BulkProcessor//
                    .builder(client, new BulkProcessor.Listener() {
                        @Override
                        public void afterBulk(long arg0, BulkRequest arg1, BulkResponse arg2) {
                            // TODO Auto-generated method stub
                            System.out.println("结束afterBulk(long arg0, BulkRequest arg1, BulkResponse arg2)。。。。");
                        }
    
                        @Override
                        public void afterBulk(long arg0, BulkRequest arg1, Throwable arg2) {
                            // TODO Auto-generated method stub
                            System.out.println("结束afterBulk(long arg0, BulkRequest arg1, Throwable arg2)。。。。");
                            System.out.println(arg1.numberOfActions() + " data bulk failed,reason :" + arg2);
                        }
    
                        @Override
                        public void beforeBulk(long arg0, BulkRequest arg1) {
                            // TODO Auto-generated method stub
                            System.out.println("开始。。。。");
                        }
                    }) //
                    .setBulkActions(10000)//
                    .setBulkSize(new ByteSizeValue(64, ByteSizeUnit.MB))//
                    .setFlushInterval(TimeValue.timeValueSeconds(5))//
                    .setConcurrentRequests(1)//
                    .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))//
                    .build();
            return bulkProcessor;
        }
    
        private TransportClient getClient() {
            Settings settings = Settings.builder()//
                    .put("cluster.name", "es") //
                    .put("client.transport.sniff", true)//
                    .build();
    
            PreBuiltTransportClient preBuiltTransportClient = new PreBuiltTransportClient(settings);
    
            TransportClient client = preBuiltTransportClient;
            // 10.205.201.97,10.205.201.98,10.205.201.96,10.205.201.95
            try {
                client.addTransportAddress(new TransportAddress(InetAddress.getByName("10.205.201.97"), 9300));
                client.addTransportAddress(new TransportAddress(InetAddress.getByName("10.205.201.98"), 9300));
                client.addTransportAddress(new TransportAddress(InetAddress.getByName("10.205.201.96"), 9300));
                client.addTransportAddress(new TransportAddress(InetAddress.getByName("10.205.201.95"), 9300));
            } catch (UnknownHostException e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }
    
            return client;
        }
    }

    依赖pom.xml

            <!--Spark -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.2.0</version>
            </dependency>
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch-spark-20_2.11</artifactId>
                <version>6.4.2</version>
            </dependency>
    
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>transport</artifactId>
                <version>6.4.2</version>
            </dependency>

    测试速度有点低3500条记录/s

    关于ES+SPARK如何优化的文章请参考:

    Elasticsearch进阶(一)写入性能基准测试写入性能优化(56小时到5小时),chunk_size探讨

    ElasticSearch写入性能优化

    Elasticsearch写入性能优化

    elasticsearch写入优化记录,从3000到8000/s

    Spark2.x写入Elasticsearch的性能测试

  • 相关阅读:
    fullPage.js学习笔记
    jQuery.extend()方法和jQuery.fn.extend()方法源码分析
    jQuery时间轴插件timeline.js
    JQuery插件:ScrollTo平滑滚动到页面指定位置
    Font Awesome,一套绝佳的图标字体库和CSS框架
    WOW.js – 让页面滚动更有趣
    Git客户端(TortoiseGit)基本使用详解
    Turn.js 实现翻书效果的学习与总结
    CSS中的float布局
    爬虫神器——异步爬虫
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/9885040.html
Copyright © 2011-2022 走看看