zoukankan      html  css  js  c++  java
  • 如何应对SparkRedis行海量数据插入、查询作业时碰到的问题

    摘要:由于redis是基于内存的数据库,稳定性并不是很高,尤其是standalone模式下的redis。于是工作中在使用Spark-Redis时也会碰到很多问题,尤其是执行海量数据插入与查询的场景中。

    海量数据查询

    Redis是基于内存读取的数据库,相比其它的数据库,Redis的读取速度会更快。但是当我们要查询上千万条的海量数据时,即使是Redis也需要花费较长时间。这时候如果我们想要终止select作业的执行,我们希望的是所有的running task立即killed。

    Spark是有作业调度机制的。SparkContext是Spark的入口,相当于应用程序的main函数。SparkContext中的cancelJobGroup函数可以取消正在运行的job。

    /**
      * Cancel active jobs for the specified group. See `org.apache.spark.SparkContext.setJobGroup`
      * for more information.
      */
     def cancelJobGroup(groupId: String) {
       assertNotStopped()
       dagScheduler.cancelJobGroup(groupId)
     }

    按理说取消job之后,job下的所有task应该也终止。而且当我们取消select作业时,executor会throw TaskKilledException,而这个时候负责task作业的TaskContext在捕获到该异常之后,会执行killTaskIfInterrupted。

     // If this task has been killed before we deserialized it, let's quit now. Otherwise,
     // continue executing the task.
     val killReason = reasonIfKilled
     if (killReason.isDefined) {
       // Throw an exception rather than returning, because returning within a try{} block
       // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
       // exception will be caught by the catch block, leading to an incorrect ExceptionFailure
       // for the task.
       throw new TaskKilledException(killReason.get)
     }

     

    /**
     * If the task is interrupted, throws TaskKilledException with the reason for the interrupt.
     */
     private[spark] def killTaskIfInterrupted(): Unit

    但是Spark-Redis中还是会出现终止作业但是task仍然running。因为task的计算逻辑最终是在RedisRDD中实现的,RedisRDD的compute会从Jedis中取获取keys。所以说要解决这个问题,应该在RedisRDD中取消正在running的task。这里有两种方法:

    方法一:参考Spark的JDBCRDD,定义close(),结合InterruptibleIterator。

    def close() {
       if (closed) return
       try {
         if (null != rs) {
           rs.close()
         }
       } catch {
         case e: Exception => logWarning("Exception closing resultset", e)
       }
       try {
         if (null != stmt) {
           stmt.close()
         }
       } catch {
         case e: Exception => logWarning("Exception closing statement", e)
       }
       try {
         if (null != conn) {
           if (!conn.isClosed && !conn.getAutoCommit) {
             try {
               conn.commit()
             } catch {
               case NonFatal(e) => logWarning("Exception committing transaction", e)
             }
           }
           conn.close()
         }
         logInfo("closed connection")
       } catch {
         case e: Exception => logWarning("Exception closing connection", e)
       }
       closed = true
     }
     
     context.addTaskCompletionListener{ context => close() } 
    CompletionIterator[InternalRow, Iterator[InternalRow]](
       new InterruptibleIterator(context, rowsIterator), close())

    方法二:异步线程执行compute,主线程中判断task isInterrupted

    try{
       val thread = new Thread() {
         override def run(): Unit = {
           try {
              keys = doCall
           } catch {
             case e =>
               logWarning(s"execute http require failed.")
           }
           isRequestFinished = true
         }
       }
     
       // control the http request for quite if user interrupt the job
       thread.start()
       while (!context.isInterrupted() && !isRequestFinished) {
         Thread.sleep(GetKeysWaitInterval)
       }
       if (context.isInterrupted() && !isRequestFinished) {
         logInfo(s"try to kill task ${context.getKillReason()}")
         context.killTaskIfInterrupted()
       }
       thread.join()
       CompletionIterator[T, Iterator[T]](
         new InterruptibleIterator(context, keys), close)

    我们可以异步线程来执行compute,然后在另外的线程中判断是否task isInterrupted,如果是的话就执行TaskContext的killTaskIfInterrupted。防止killTaskIfInterrupted无法杀掉task,再结合InterruptibleIterator:一种迭代器,以提供任务终止功能。通过检查[TaskContext]中的中断标志来工作。

    海量数据插入

    我们都已经redis的数据是保存在内存中的。当然Redis也支持持久化,可以将数据备份到硬盘中。当插入海量数据时,如果Redis的内存不够的话,很显然会丢失部分数据。这里让使用者困惑的点在于: 当Redis已使用内存大于最大可用内存时,Redis会报错:command not allowed when used memory > ‘maxmemory’。但是当insert job的数据大于Redis的可用内存时,部分数据丢失了,并且还没有任何报错。

    因为不管是Jedis客户端还是Redis服务器,当插入数据时内存不够,不会插入成功,但也不会返回任何response。所以目前能想到的解决办法就是当insert数据丢失时,扩大Redis内存。

    总结

    Spark-Redis是一个应用还不是很广泛的开源项目,不像Spark JDBC那样已经商业化。所以Spark-Redis还是存在很多问题。相信随着commiter的努力,Spark-Redis也会越来越强大。

     

    点击关注,第一时间了解华为云新鲜技术~

  • 相关阅读:
    【LeetCode】17. Letter Combinations of a Phone Number
    【LeetCode】16. 3Sum Closest
    【LeetCode】15. 3Sum 三个数和为0
    【LeetCode】14. Longest Common Prefix 最长前缀子串
    【LeetCode】13. Roman to Integer 罗马数字转整数
    【LeetCode】12. Integer to Roman 整型数转罗马数
    【LeetCode】11. Container With Most Water
    【LeetCode】10. Regular Expression Matching
    Models of good programmer
    RSA Algorithm
  • 原文地址:https://www.cnblogs.com/huaweiyun/p/14052493.html
Copyright © 2011-2022 走看看