zoukankan      html  css  js  c++  java
  • spark出现task不能序列化错误的解决方法 org.apache.spark.SparkException: Task not serializable

    import org.elasticsearch.cluster.routing.Murmur3HashFunction;
    import org.elasticsearch.common.math.MathUtils;
    
    // 自定义Partitioner
    class ESShardPartitioner(settings: String) extends org.apache.spark.Partitioner {
      protected var _numPartitions = -1;  
      protected var _hashFunction = new org.elasticsearch.cluster.routing.Murmur3HashFunction;//此处会出现序列化错误
      override def numPartitions: Int = {
        val newSettings = new org.elasticsearch.hadoop.cfg.PropertiesSettings().load(settings);
        // 生产环境下,需要自行设置索引的 index/type,我是以web/blog作为实验的index
        newSettings.setResourceRead("web/blog"); // ******************** !!! modify it !!! ******************** 
        newSettings.setResourceWrite("web/blog"); // ******************** !!! modify it !!! ******************** 
        val repository = new org.elasticsearch.hadoop.rest.RestRepository(newSettings);
        val targetShards = repository.getWriteTargetPrimaryShards(newSettings.getNodesClientOnly());
        repository.close();
        // targetShards ??? data structure
        _numPartitions = targetShards.size();
        println("********************numPartitions*************************");
        println(_numPartitions);
        _numPartitions;
      }
    
      override def getPartition(docID: Any): Int = {    
        val r = _hashFunction.hash(docID.toString());
        val shardId = org.elasticsearch.common.math.MathUtils.mod(r, _numPartitions);
        println("********************shardId*************************");
        println(shardId)
        shardId;
      }
    }

    根源:出现“task not serializable"这个错误,一般是因为在map、filter等的参数使用了外部的变量,但是这个变量不能序列化。特别是当引用了某个类(经常是当前类)的成员函数或变量时,会导致这个类的所有成员(整个类)都需要支持序列化。

    解决方法:

    class ESShardPartitioner(settings: String) extends org.apache.spark.Partitioner {
      protected var _numPartitions = -1;  
    
      override def numPartitions: Int = {
        val newSettings = new org.elasticsearch.hadoop.cfg.PropertiesSettings().load(settings);
        // 生产环境下,需要自行设置索引的 index/type,我是以web/blog作为实验的index
        newSettings.setResourceRead("web/blog"); // ******************** !!! modify it !!! ******************** 
        newSettings.setResourceWrite("web/blog"); // ******************** !!! modify it !!! ******************** 
        val repository = new org.elasticsearch.hadoop.rest.RestRepository(newSettings);
        val targetShards = repository.getWriteTargetPrimaryShards(newSettings.getNodesClientOnly());
        repository.close();
        // targetShards ??? data structure
        _numPartitions = targetShards.size();
        println("********************numPartitions*************************");
        println(_numPartitions);
        _numPartitions;
      }
    
      override def getPartition(docID: Any): Int = {
        val _hashFunction = new org.elasticsearch.cluster.routing.Murmur3HashFunction;
        val r = _hashFunction.hash(docID.toString());
        val shardId = org.elasticsearch.common.math.MathUtils.mod(r, _numPartitions);
        println("********************shardId*************************");
        println(shardId)
        shardId;
      }
    }

    Job aborted due to stage failure: Task not serializable:

    If you see this error:

    org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: ...
    

    The above error can be triggered when you intialize a variable on the driver (master), but then try to use it on one of the workers. In that case, Spark Streaming will try to serialize the object to send it over to the worker, and fail if the object is not serializable. Consider the following code snippet:

    NotSerializable notSerializable = new NotSerializable();
    JavaRDD<String> rdd = sc.textFile("/tmp/myfile");
    
    rdd.map(s -> notSerializable.doSomething(s)).collect();
    

    This will trigger that error. Here are some ideas to fix this error:

    • Serializable the class
    • Declare the instance only within the lambda function passed in map.
    • Make the NotSerializable object as a static and create it once per machine.
    • Call rdd.forEachPartition and create the NotSerializable object in there like this:
    rdd.forEachPartition(iter -> {
      NotSerializable notSerializable = new NotSerializable();
    
      // ...Now process iter
    });

    参考:https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html
  • 相关阅读:
    实训9.4.前端:url、href、src,link和@import
    实训9.2.作业1.写一个10次循环,每次得到一个随机数,放进一个集合中,如果这个数已经存在集合中则跳过,最后打印集合中的数字.
    实训9.3. SQL——STRUCTURED QUERY LANGUAGE(结构化查询语言 )
    实训9.2.类集,Collection接口
    实训9.2.IDEA ——java编程语言开发的集成环境(集成开发工具)
    实训9.2. JDK——java语言的软件开发工具包(JAVA的运行环境(JVM+Java系统类库)和JAVA工具) 【java开发的核心】
    从键盘输入数据
    error
    ubuntu 14.04, Command "/usr/bin/python -u -c "import setuptools, tokenize;__file__='
    用Python徒手写线性回归
  • 原文地址:https://www.cnblogs.com/bonelee/p/6120539.html
Copyright © 2011-2022 走看看