zoukankan      html  css  js  c++  java
  • 自定义Spark Partitioner提升es-hadoop Bulk效率

    http://www.jianshu.com/p/cccc56e39429/comments/2022782 和 https://github.com/elastic/elasticsearch-hadoop/issues/745 都有提到通过自定义Spark Partitioner提升es-hadoop Bulk效率,但是无可运行代码,自己针对其思路在spark-shell里实现了一份。

    思路:

    spark streming监控/tmp/data下的新文件,并将文中每行内容存储到ES的web/blog索引里!

    注意:代码里使用了doc ID来定制路由,该id为自动生成的uuid!因此在启动ES后,需要:

    curl -s -XPUT localhost:9200/web -d '
    {
        "mappings": {
            "blog": {
                "_id": {
                    "path": "uuid"
                },
                "properties": {
                    "title": {
                        "type":   "string",
                        "index":  "analyzed"
                    }
                }
            }
        }
    }'

    告诉ES使用blog document中的uuid字段作为_id。ES 2.0以后见 http://stackoverflow.com/questions/32334709/how-to-set-id-in-elasticsearch-2-0

    下面是spark-shell代码:

    import org.apache.spark._
    import org.apache.spark.streaming._
    import org.elasticsearch.spark._
    import org.apache.spark.Partitioner
    import org.elasticsearch.hadoop.cfg.PropertiesSettings
    import org.elasticsearch.spark.cfg.SparkSettingsManager
    import org.elasticsearch.hadoop.cfg.Settings
    import org.elasticsearch.hadoop.rest.RestRepository
    import scala.collection.JavaConversions._
    
    
    // 为方便测试,下面是自己用scala实现的es hash函数
    // 尤其注意:在生产环境下,使用ES jar包里的函数,位置为:
    // https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/cluster/routing/Murmur3HashFunction.java
    object Murmur3HashFunction {
      def hash(routing: String): Int = {
        val bytesToHash = Array.ofDim[Byte](routing.length * 2)
        for (i <- 0 until routing.length) {
          val c = routing.charAt(i)
          val b1 = c.toByte
          val b2 = (c >>> 8).toByte
          assert(((b1 & 0xFF) | ((b2 & 0xFF) << 8)) == c)
          bytesToHash(i * 2) = b1
          bytesToHash(i * 2 + 1) = b2
        }
        hash(bytesToHash, 0, bytesToHash.length)
      }
    
      def hash(bytes: Array[Byte], offset: Int, length: Int): Int = {
        murmurhash3_x86_32(bytes, offset, length, 0)
      }
    
      def murmurhash3_x86_32(data: Array[Byte], 
          offset: Int, 
          len: Int, 
          seed: Int): Int = {
        val c1 = 0xcc9e2d51
        val c2 = 0x1b873593
        var h1 = seed
        val roundedEnd = offset + (len & 0xfffffffc)
        var i = offset
        while (i < roundedEnd) {
          var k1 = (data(i) & 0xff) | ((data(i + 1) & 0xff) << 8) | ((data(i + 2) & 0xff) << 16) | 
            (data(i + 3) << 24)
          k1 *= c1
          k1 = (k1 << 15) | (k1 >>> 17)
          k1 *= c2
          h1 ^= k1
          h1 = (h1 << 13) | (h1 >>> 19)
          h1 = h1 * 5 + 0xe6546b64
          i += 4
        }
        var k1 = 0
        len & 0x03 match {
          case 3 => k1 = (data(roundedEnd + 2) & 0xff) << 16
          case 2 => k1 |= (data(roundedEnd + 1) & 0xff) << 8
          case 1 => 
            k1 |= (data(roundedEnd) & 0xff)
            k1 *= c1
            k1 = (k1 << 15) | (k1 >>> 17)
            k1 *= c2
            h1 ^= k1
          case _ => //break
        }
        h1 ^= len
        h1 ^= h1 >>> 16
        h1 *= 0x85ebca6b
        h1 ^= h1 >>> 13
        h1 *= 0xc2b2ae35
        h1 ^= h1 >>> 16
        h1
      }
    }
    
    // 自定义Partitioner
    class ESShardPartitioner(settings: String) extends Partitioner {
          protected var _numPartitions = -1
          
          override def numPartitions: Int = {   
            val newSettings = new PropertiesSettings().load(settings)
            // 生产环境下,需要自行设置索引的 index/type,我是以web/blog作为实验的index
            newSettings.setResourceRead("web/blog") // ******************** !!! modify it !!! ******************** 
            newSettings.setResourceWrite("web/blog") // ******************** !!! modify it !!! ******************** 
            val repository = new RestRepository(newSettings)
            val targetShards = repository.getWriteTargetPrimaryShards(newSettings.getNodesClientOnly())
            repository.close()
            _numPartitions = targetShards.size()
            _numPartitions
          } 
                
          override def getPartition(docID: Any): Int = {
            var shardId = Murmur3HashFunction.hash(docID.toString()) % _numPartitions;
            if (shardId < 0) {
                shardId += _numPartitions;
            }
            shardId
          }
    }
    
    sc.getConf.setMaster("local").setAppName("RDDTest").set("es.nodes", "127.0.0.1").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("es.index.auto.create", "true");
    val ssc = new StreamingContext(sc, Seconds(2));
    val fileStream = ssc.textFileStream("/tmp/data");
    
    fileStream.foreachRDD { rdd => {
        def makeItem(content: String) : (String, Map[String,String]) = {
            val uuid = java.util.UUID.randomUUID.toString();
            (uuid, Map("content"->content, "uuid"->uuid))     
        }
        println("********************start*************************");
        var r2 = rdd.map(makeItem);
        val sparkCfg = new SparkSettingsManager().load(rdd.sparkContext.getConf)
        val settings = sparkCfg.save();
        var r3 = r2.partitionBy(new ESShardPartitioner(settings));    
        r3.map(x=>x._2).saveToEs("web/blog")
        println("data count: " + rdd.count.toString);
        println("*********************end************************");
    }};
    
    ssc.start();
    ssc.awaitTermination();

    运行方法:

     ./spark-shell --jars ../lib/elasticsearch-spark-1.2_2.10-2.1.2.jar

    然后在spark shell里运行上述代码。

    通过shell 伪造数据:

    mkdir /mmp/data
    #rm -rf  /tmp/ ata"
    rm -f "/tmp/data/*"
    for ((j=0;j<30;j++)); do
            {
            for ((i=0;i<20;i++)); do
            file_name=`python -c 'import random;print random.random()'`
            echo "$j $i is sad story." >"/tmp/data/$file_name.log"
            done
            sleep 1
            }
    done
    echo "OK, waiting..."
    echo "done"

    运行上述脚本,看到spark shell里显示:

    见http://www.cnblogs.com/bonelee/p/6078956.html ES路由底层实现!

  • 相关阅读:
    grid布局
    flex弹性布局
    数据库连接使用ThreadLocal
    maven核心,pom.xml详解
    src和herf的区别
    表单校验出错,导致表单无法提交
    联机调试,如何配置局域网内文件服务器
    不同语言时间戳长度问题,及Java获取时间戳的效率
    OO设计原则 -- OO设计的原则及设计过程的全面总结
    Maven项目被clean命令之后的红叉或找不到class文件
  • 原文地址:https://www.cnblogs.com/bonelee/p/6057211.html
Copyright © 2011-2022 走看看