zoukankan      html  css  js  c++  java
  • 自定义Spark Partitioner提升es-hadoop Bulk效率——续

    对于es 2.4版本,要能定制spark partitioner需要如下方式启动spark shell:

    spark-2.0.0-bin-hadoop2.6/bin/spark-shell --jars elasticsearch-hadoop-5.0.1/dist/elasticsearch-spark-20_2.11-5.0.1.jar,elasticsearch-2.4.1/lib/elasticsearch-2.4.1.jar,elasticsearch-2.4.1/lib/lucene-core-5.5.2.jar

    因为es 2.4的路由方式依赖es jar包里的murmurhash函数:

    import org.apache.spark._
    import org.apache.spark.streaming._
    import org.elasticsearch.spark._
    import org.apache.spark.Partitioner
    import org.elasticsearch.hadoop.cfg.PropertiesSettings
    import org.elasticsearch.spark.cfg.SparkSettingsManager
    import org.elasticsearch.hadoop.cfg.Settings
    import org.elasticsearch.hadoop.rest.RestRepository
    import scala.collection.JavaConversions._
    
    import org.elasticsearch.cluster.routing.Murmur3HashFunction;
    import org.elasticsearch.common.math.MathUtils;
    
    // 自定义Partitioner
    class ESShardPartitioner(settings: String) extends org.apache.spark.Partitioner {
      protected var _numPartitions = -1;  
    
      override def numPartitions: Int = {
        val newSettings = new org.elasticsearch.hadoop.cfg.PropertiesSettings().load(settings);
        // 生产环境下,需要自行设置索引的 index/type,我是以web/blog作为实验的index
        newSettings.setResourceRead("web/blog"); // ******************** !!! modify it !!! ******************** 
        newSettings.setResourceWrite("web/blog"); // ******************** !!! modify it !!! ******************** 
        val repository = new org.elasticsearch.hadoop.rest.RestRepository(newSettings);
        val targetShards = repository.getWriteTargetPrimaryShards(newSettings.getNodesClientOnly());
        repository.close();
        // targetShards ??? data structure
        _numPartitions = targetShards.size();
        println("********************numPartitions*************************");
        println(_numPartitions);
        _numPartitions;
      }
    
      override def getPartition(docID: Any): Int = {
        val _hashFunction = new org.elasticsearch.cluster.routing.Murmur3HashFunction;
        val r = _hashFunction.hash(docID.toString());
        val shardId = org.elasticsearch.common.math.MathUtils.mod(r, _numPartitions);
        println("********************shardId*************************");
        println(shardId)
        shardId;
      }
    }
    
    sc.getConf.setMaster("local").setAppName("RDDTest").set("es.nodes", "127.0.0.1").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("es.index.auto.create", "true");
    val ssc = new StreamingContext(sc, Seconds(2));
    val fileStream = ssc.textFileStream("/tmp/data");
    
    fileStream.foreachRDD { rdd => {
        def makeItem(content: String) : (String, Map[String,String]) = {
            val uuid = java.util.UUID.randomUUID.toString();
            (uuid, Map("content"->content, "uuid"->uuid))     
        }
        println("********************start*************************");
        println("********************default partition size*************************");
        println(rdd.partitions.size);
    
        var r2 = rdd.map(makeItem);
        val sparkCfg = new org.elasticsearch.spark.cfg.SparkSettingsManager().load(rdd.sparkContext.getConf)
        val settings = sparkCfg.save();
        var r3 = r2.partitionBy(new ESShardPartitioner(settings));    
        // r3.map(x=>x._2).saveToEs("web/blog")
        println("********************changed partition size*************************");
        println(r3.partitions.size);
        r3.saveToEsWithMeta("web/blog")
        println("data count: " + rdd.count.toString);
        println("*********************end************************");
    }};
    
    ssc.start();
    ssc.awaitTermination();

     ES 其中一个机器的配置:

    cluster.name: es_xxx
    #cluster.name: es_single888
    discovery.zen.ping.multicast.enabled: false
    discovery.zen.ping.unicast.hosts: ["127.0.0.1:9300", "127.0.0.1:9301", "127.0.0.1:9302"]
    #discovery.zen.ping.unicast.hosts: ["10.178.206.190:9300", "10.178.204.225:9300", "10.178.207.88:9300", "10.178.209.161:9300", "10.178.208.230:9300"]
    network.host: 127.0.0.1
    transport.tcp.port: 9300
    http.port: 9200
    index.refresh_interval: 30s
    indices.memory.index_buffer_size: 30%
    index.store.type: mmapfs
    index.translog.flush_threshold_ops: 50000
    indices.store.throttle.type: none
    index.legacy.routing.use_type: false
    index.number_of_shards: 3
    index.number_of_replicas: 0
  • 相关阅读:
    929. 独特的电子邮件地址
    [工具.tcp]测试TCP通讯的网络延迟
    [技巧.Dotnet]轻松实现“强制.net程序以管理员身份运行”。
    [问题记录.VisualStudio]VS2013无法新增和打开项目
    [问题记录.VisualStudio]TFS项目映射问题解决
    [问题记录.dotnet]取网卡信息报错"找不到"-WMI
    模型驱动的数学原理
    剑指OFFER 旋转数组的最小数字
    剑指OFFER 用两个栈实现队列
    剑指OFFER 按之字形顺序打印二叉树
  • 原文地址:https://www.cnblogs.com/bonelee/p/6125866.html
Copyright © 2011-2022 走看看