zoukankan      html  css  js  c++  java
  • Redis on Spark:Task not serializable

    We use Redis on Spark to cache our key-value pairs.This is the code:

    import com.redis.RedisClient
    val r = new RedisClient("192.168.1.101", 6379)
    val perhit = perhitFile.map(x => {
        val arr = x.split(" ")
        val readId = arr(0).toInt
        val refId = arr(1).toInt
        val start = arr(2).toInt
        val end = arr(3).toInt
        val refStr = r.hmget("refStr", refId).get(refId).split(",")(1)
        val readStr = r.hmget("readStr", readId).get(readId)
        val realend = if(end > refStr.length - 1) refStr.length - 1 else end
        val refOneStr = refStr.substring(start, realend)
          (readStr, refOneStr, refId, start, realend, readId)
     })

    But compiler gave me feedback like this:

    Exception in thread "main" org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
        at org.apache.spark.rdd.RDD.map(RDD.scala:270)
        at com.ynu.App$.main(App.scala:511)
        at com.ynu.App.main(App.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    Caused by: java.io.NotSerializableException: com.redis.RedisClient
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
        ... 12 more

    Could somebody tell me how to serialize the data get from Redis.Thanks a lot.

    shareedit
     

    2 Answers

    In Spark, the functions on RDDs (like map here) are serialized and send to the executors for processing. This implies that all elements contained within those operations should be serializable.

    The Redis connection here is not serializable as it opens TCP connections to the target DB that are bound to the machine where it's created.

    The solution is to create those connections on the executors, in the local execution context. There're few ways to do that. Two that pop to mind are:

    • rdd.mapPartitions: lets you process a whole partition at once, and therefore amortize the cost of creating connections)
    • Singleton connection managers: Create the connection once per executor

    mapPartitions is easier as all it requires is a small change to the program structure:

    val perhit = perhitFile.mapPartitions{partition => 
        val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation
        val res = partition.map{ x =>
            ...
            val refStr = r.hmget(...) // use r to process the local data
        }
        r.close // take care of resources
        res
    }

    A singleton connection manager can be modeled with an object that holds a lazy reference to a connection (note: a mutable ref will also work).

    object RedisConnection extends Serializable {
       lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
    }

    This object can then be used to instantiate 1 connection per worker JVM and is used as a Serializable object in an operation closure.

    val perhit = perhitFile.map{x => 
        val param = f(x)
        val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data
        }
    }

    The advantage of using the singleton object is less overhead as connections are created only once by JVM (as opposed to 1 per RDD partition)

    There're also some disadvantages:

    • cleanup of connections is tricky (shutdown hook/timers)
    • one must ensure thread-safety of shared resources

    (*) code provided for illustration purposes. Not compiled or tested.

    shareedit
     
        
    Thank you for answering! I use this library github.com/debasishg/scala-redis. It haven't a method named "close", instead, it is "disconnect".I've no idea if it works. Could you tell me which library you are using now to deal with Redis data? – fanhk Jan 20 '15 at 4:33
        
    Plus 1 for the Singleton solution. Can you give an example on how to manage the closing of the connection?– Sohaib Dec 4 '15 at 11:11
        
    @Sohaib given this is a VM-bound object, you'll need to register a shutdown hook to cleanly close connections. – maasg Dec 11 '15 at 9:06
     

    You're trying to serialize the client. You have one RedisClientr, that you're trying to use inside themap that will be run across different cluster nodes. Either get the data you want out of redis separately before doing a cluster task, or create the client individually for each cluster task inside yourmap block (perhaps by using mapPartitions rather than map, as creating a new redis client for each individual row is probably a bad idea).

    shareedit
     
        
    Thank you for answering, but could you tell me how to use mapPartitions in this situation? – fanhk Jan 18 '15 at 11:49
        
    Call mapPartitions passing a block that accepts an iterable (as you can see from the signature ofmapPartitions), creates the RedisClient inside the block, and then uses it to map the Iterable as you were doing. The point is that the RedisClient gets created inside the processing for a single partition. What did you try and where did you get stuck? – lmm Jan 19 '15 at 14:57
        
    Problem solved,thank you! – fanhk Jan 20 '15 at 4:42
  • 相关阅读:
    bootstrap table 怎么自适应宽度
    nginx解决超长请求串(413 request Entity too Large错误解决办法)
    nginx proxy_buffer_size 解决后端服务传输数据过多,其实是header过大的问题
    测试了下boost的序列化反序列化功能
    测试C++代码与WebBrowser HTML的互动
    Open SSL 开发环境配置
    modern-cpp-features
    Qt程序调试之Q_ASSERT断言(条件为真则跳过,否则直接异常+崩溃)
    分布式事务
    Django admin
  • 原文地址:https://www.cnblogs.com/seaspring/p/5681363.html
Copyright © 2011-2022 走看看