zoukankan      html  css  js  c++  java
  • Spark2.x(五十四):在spark structured streaming下测试ds.selectExpr(),当返回列多时出现卡死问题。

    业务需求,有一部分动态字段,需要在程序中动态加载并解析表达式:

    实现方案1):在MapFunction、MapPartitionFunction中使用FelEngine进行解析:

            FelEngine fel = FelEngine.instance;
            FelContext ctx = fel.getContext();
            ctx.set("rsrp", 100);
            ctx.set("rsrq", 80);
    
            expValue = Double.valueOf(String.valueOf(fel.eval("rsrp*10-rsrq*8")));

    实现方案2):采用selectExpr()函数

    package com.dx.streaming.drivers.test;
    
    import org.apache.spark.api.java.function.MapPartitionsFunction;
    import org.apache.spark.sql.*;
    import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
    import org.apache.spark.sql.catalyst.encoders.RowEncoder;
    import org.apache.spark.sql.streaming.OutputMode;
    import org.apache.spark.sql.streaming.StreamingQueryException;
    import org.apache.spark.sql.streaming.Trigger;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructType;
    import scala.collection.JavaConversions;
    import scala.collection.Seq;
    
    import java.util.*;
    import java.util.concurrent.TimeUnit;
    
    public class MrsExpressionDoWithSelectExp {
        public static void main(String[] args) {
            SparkSession sparkSession = SparkSession.builder().appName("test").master("local[*]").getOrCreate();
            
            StructType type = new StructType();
            type = type.add("id", DataTypes.StringType);
            type = type.add("cellname", DataTypes.StringType);
            type = type.add("rsrp", DataTypes.StringType);
            type = type.add("rsrq", DataTypes.StringType);
            ExpressionEncoder<Row> encoder = RowEncoder.apply(type);
    
            Dataset<String> ds = sparkSession.readStream().textFile("E:\test-structured-streaming-dir\*");
            Dataset<Row> rows = ds.mapPartitions(new MapPartitionsFunction<String, Row>() {
                private static final long serialVersionUID = -1988302292518096148L;
    
                @Override
                public Iterator<Row> call(Iterator<String> input) throws Exception {
                    List<Row> rows = new ArrayList<>();
                    while (input.hasNext()) {
                        String line = input.next();
                        String[] items = line.split(",");
                        rows.add(RowFactory.create(items));
                    }
                    return rows.iterator();
                }
            }, encoder);
            rows.printSchema();
    
            int dynamicExprLength=10;
            Map<String, String> expMap = new LinkedHashMap<>();
            // 从配置文件加载配置公式
            expMap.put("rsrpq_count", "rsrp+rsrp");
            expMap.put("rsrpq_sum", "rsrp*10+rsrq*10");
            for(int i=0;i<dynamicExprLength;i++){
                expMap.put("rsrpq_sum"+i, "rsrp*10+rsrq*10");    
            }
                    
            expMap.put("$rsrpq_avg", "rsrpq_sum/rsrpq_count");
    
            List<String> firstLayerExpList = new ArrayList<>();
            List<String> secondLayerExpList = new ArrayList<>();
            firstLayerExpList.add("*");
            secondLayerExpList.add("*");
    
            for (Map.Entry<String, String> kv : expMap.entrySet()) {
                if (kv.getKey().startsWith("$")) {
                    secondLayerExpList.add("(" + kv.getValue() + ") as " + kv.getKey().replace("$", ""));
                } else {
                    firstLayerExpList.add("(" + kv.getValue() + ") as " + kv.getKey());
                }
            }
    
            // 第一层计算:select *,(rsrp+rsrp) as rsrpq_count,(rsrp*10+rsrq*10) as rsrpq_sum
            //rows = rows.selectExpr(firstLayerExpList.toArray(new String[firstLayerExpList.size()] ));
            Seq<String> firstLayerExpSeq = JavaConversions.asScalaBuffer(firstLayerExpList);
            rows = rows.selectExpr(firstLayerExpSeq);
            //rows.show();
    
            // 第二层计算:select *,(rsrpq_sum/rsrpq_count) as rsrpq_avg
            //rows = rows.selectExpr(secondLayerExpList.toArray(new String[secondLayerExpList.size()] ));
            Seq<String> secondLayerExpSeq = JavaConversions.asScalaBuffer(secondLayerExpList);
            rows = rows.selectExpr(secondLayerExpSeq);
            
            rows.printSchema();
            //rows.show();
            rows.writeStream().format("console").outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(1,TimeUnit.MINUTES)).start();
            try {
                sparkSession.streams().awaitAnyTermination();
            } catch (StreamingQueryException e) {
                e.printStackTrace();
            }
    
        }
    }

    此时动态列dynamicExprLength为10,可以正常输出。

    ds.selectExpr()问题发现:

    当列设置为500或者1000时,本地测试出现以下问题:

    19/07/18 14:18:18 INFO CodeGenerator: Code generated in 105.715218 ms
    19/07/18 14:18:19 WARN CodeGenerator: Error calculating stats of compiled class.
    java.io.EOFException
        at java.io.DataInputStream.readFully(DataInputStream.java:197)
        at java.io.DataInputStream.readFully(DataInputStream.java:169)
        at org.codehaus.janino.util.ClassFile.loadAttribute(ClassFile.java:1509)
        at org.codehaus.janino.util.ClassFile.loadAttributes(ClassFile.java:644)
        at org.codehaus.janino.util.ClassFile.loadFields(ClassFile.java:623)
        at org.codehaus.janino.util.ClassFile.<init>(ClassFile.java:280)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anonfun$recordCompilationStats$1.apply(CodeGenerator.scala:996)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anonfun$recordCompilationStats$1.apply(CodeGenerator.scala:993)
        at scala.collection.Iterator$class.foreach(Iterator.scala:750)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1202)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.recordCompilationStats(CodeGenerator.scala:993)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:961)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1027)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1024)
        at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
        at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
        at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
        at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
        at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
        at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
        at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:906)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:412)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:366)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:32)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:890)
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.extractProjection$lzycompute(ExpressionEncoder.scala:263)
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.extractProjection(ExpressionEncoder.scala:263)
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:287)
        at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:573)
        at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:573)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:235)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    19/07/18 14:18:19 INFO CodeGenerator: Code generated in 1354.475257 ms

    当发布到yarn上不管是yarn-client还是yarn-cluster都会出现卡死问题,executor/driver创建起来,并且都分配了资源,但是没有任务被分配。

    而且没有任何错误日志抛出,一直卡顿,可以持续到无限时间。

  • 相关阅读:
    Java内存管理以及各个内存区域详解
    python数据的存储和持久化操作
    Redis的安装及配置
    POI使用详解
    遍历Map的几种方法
    Quartz的cronTrigger表达式
    Java对XML文档的增删改查
    Solr系列二:Solr与mmseg4j的整合
    cms STW 的两个阶段
    GROUP BY 和 ORDER BY 同时使用问题
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/11207078.html
Copyright © 2011-2022 走看看