zoukankan      html  css  js  c++  java
  • Spark2.x(五十四):在spark structured streaming下测试ds.selectExpr(),当返回列多时出现卡死问题。

    业务需求,有一部分动态字段,需要在程序中动态加载并解析表达式:

    实现方案1):在MapFunction、MapPartitionFunction中使用FelEngine进行解析:

            FelEngine fel = FelEngine.instance;
            FelContext ctx = fel.getContext();
            ctx.set("rsrp", 100);
            ctx.set("rsrq", 80);
    
            expValue = Double.valueOf(String.valueOf(fel.eval("rsrp*10-rsrq*8")));

    实现方案2):采用selectExpr()函数

    package com.dx.streaming.drivers.test;
    
    import org.apache.spark.api.java.function.MapPartitionsFunction;
    import org.apache.spark.sql.*;
    import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
    import org.apache.spark.sql.catalyst.encoders.RowEncoder;
    import org.apache.spark.sql.streaming.OutputMode;
    import org.apache.spark.sql.streaming.StreamingQueryException;
    import org.apache.spark.sql.streaming.Trigger;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructType;
    import scala.collection.JavaConversions;
    import scala.collection.Seq;
    
    import java.util.*;
    import java.util.concurrent.TimeUnit;
    
    public class MrsExpressionDoWithSelectExp {
        public static void main(String[] args) {
            SparkSession sparkSession = SparkSession.builder().appName("test").master("local[*]").getOrCreate();
            
            StructType type = new StructType();
            type = type.add("id", DataTypes.StringType);
            type = type.add("cellname", DataTypes.StringType);
            type = type.add("rsrp", DataTypes.StringType);
            type = type.add("rsrq", DataTypes.StringType);
            ExpressionEncoder<Row> encoder = RowEncoder.apply(type);
    
            Dataset<String> ds = sparkSession.readStream().textFile("E:\test-structured-streaming-dir\*");
            Dataset<Row> rows = ds.mapPartitions(new MapPartitionsFunction<String, Row>() {
                private static final long serialVersionUID = -1988302292518096148L;
    
                @Override
                public Iterator<Row> call(Iterator<String> input) throws Exception {
                    List<Row> rows = new ArrayList<>();
                    while (input.hasNext()) {
                        String line = input.next();
                        String[] items = line.split(",");
                        rows.add(RowFactory.create(items));
                    }
                    return rows.iterator();
                }
            }, encoder);
            rows.printSchema();
    
            int dynamicExprLength=10;
            Map<String, String> expMap = new LinkedHashMap<>();
            // 从配置文件加载配置公式
            expMap.put("rsrpq_count", "rsrp+rsrp");
            expMap.put("rsrpq_sum", "rsrp*10+rsrq*10");
            for(int i=0;i<dynamicExprLength;i++){
                expMap.put("rsrpq_sum"+i, "rsrp*10+rsrq*10");    
            }
                    
            expMap.put("$rsrpq_avg", "rsrpq_sum/rsrpq_count");
    
            List<String> firstLayerExpList = new ArrayList<>();
            List<String> secondLayerExpList = new ArrayList<>();
            firstLayerExpList.add("*");
            secondLayerExpList.add("*");
    
            for (Map.Entry<String, String> kv : expMap.entrySet()) {
                if (kv.getKey().startsWith("$")) {
                    secondLayerExpList.add("(" + kv.getValue() + ") as " + kv.getKey().replace("$", ""));
                } else {
                    firstLayerExpList.add("(" + kv.getValue() + ") as " + kv.getKey());
                }
            }
    
            // 第一层计算:select *,(rsrp+rsrp) as rsrpq_count,(rsrp*10+rsrq*10) as rsrpq_sum
            //rows = rows.selectExpr(firstLayerExpList.toArray(new String[firstLayerExpList.size()] ));
            Seq<String> firstLayerExpSeq = JavaConversions.asScalaBuffer(firstLayerExpList);
            rows = rows.selectExpr(firstLayerExpSeq);
            //rows.show();
    
            // 第二层计算:select *,(rsrpq_sum/rsrpq_count) as rsrpq_avg
            //rows = rows.selectExpr(secondLayerExpList.toArray(new String[secondLayerExpList.size()] ));
            Seq<String> secondLayerExpSeq = JavaConversions.asScalaBuffer(secondLayerExpList);
            rows = rows.selectExpr(secondLayerExpSeq);
            
            rows.printSchema();
            //rows.show();
            rows.writeStream().format("console").outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(1,TimeUnit.MINUTES)).start();
            try {
                sparkSession.streams().awaitAnyTermination();
            } catch (StreamingQueryException e) {
                e.printStackTrace();
            }
    
        }
    }

    此时动态列dynamicExprLength为10,可以正常输出。

    ds.selectExpr()问题发现:

    当列设置为500或者1000时,本地测试出现以下问题:

    19/07/18 14:18:18 INFO CodeGenerator: Code generated in 105.715218 ms
    19/07/18 14:18:19 WARN CodeGenerator: Error calculating stats of compiled class.
    java.io.EOFException
        at java.io.DataInputStream.readFully(DataInputStream.java:197)
        at java.io.DataInputStream.readFully(DataInputStream.java:169)
        at org.codehaus.janino.util.ClassFile.loadAttribute(ClassFile.java:1509)
        at org.codehaus.janino.util.ClassFile.loadAttributes(ClassFile.java:644)
        at org.codehaus.janino.util.ClassFile.loadFields(ClassFile.java:623)
        at org.codehaus.janino.util.ClassFile.<init>(ClassFile.java:280)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anonfun$recordCompilationStats$1.apply(CodeGenerator.scala:996)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anonfun$recordCompilationStats$1.apply(CodeGenerator.scala:993)
        at scala.collection.Iterator$class.foreach(Iterator.scala:750)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1202)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.recordCompilationStats(CodeGenerator.scala:993)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:961)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1027)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1024)
        at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
        at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
        at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
        at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
        at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
        at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
        at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:906)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:412)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:366)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:32)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:890)
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.extractProjection$lzycompute(ExpressionEncoder.scala:263)
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.extractProjection(ExpressionEncoder.scala:263)
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:287)
        at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:573)
        at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:573)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:235)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    19/07/18 14:18:19 INFO CodeGenerator: Code generated in 1354.475257 ms

    当发布到yarn上不管是yarn-client还是yarn-cluster都会出现卡死问题,executor/driver创建起来,并且都分配了资源,但是没有任务被分配。

    而且没有任何错误日志抛出,一直卡顿,可以持续到无限时间。

  • 相关阅读:
    Javascript FP-ramdajs
    微信小程序开发
    SPA for HTML5
    One Liners to Impress Your Friends
    Sass (Syntactically Awesome StyleSheets)
    iOS App Icon Template 5.0
    React Native Life Cycle and Communication
    Meteor framework
    RESTful Mongodb
    Server-sent Events
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/11207078.html
Copyright © 2011-2022 走看看