zoukankan      html  css  js  c++  java
  • Kafka0.11之RoundRobinPartitioner/HashPartitioner(Scala):

    RoundRobinPartitioner/HashPartitioner:

    import java.util
    import java.util.concurrent.atomic.AtomicLong
    
    import org.apache.kafka.clients.producer.Partitioner
    import org.apache.kafka.common.Cluster
    
    
    class SelfRoundRobinPartitioner extends Partitioner {
    
      val next = new AtomicLong();
    
      override def partition(topic: String, key: scala.Any, keyBytes: Array[Byte], value: scala.Any, valueBytes: Array[Byte], cluster: Cluster) = {
        val partitionInfo = cluster.partitionsForTopic(topic)
        val numPartitions = partitionInfo.size()
        val nextIndex = next.incrementAndGet()
        val partionNum: Long = nextIndex % numPartitions
        partionNum.toInt
      }
    
    
      override def close() = {
    
      }
    
      override def configure(configs: util.Map[String, _]) = {
    
      }
    }
    
    import java.util
    
    import scala.math._
    import kafka.utils.VerifiableProperties
    import org.apache.kafka.clients.producer.Partitioner
    import org.apache.kafka.common.Cluster
    
    class SelfHashPartitioner extends Partitioner {
    
      override def partition(topic: String, key: scala.Any, keyBytes: Array[Byte], value: scala.Any, valueBytes: Array[Byte], cluster: Cluster) = {
        val partitionInfo = cluster.partitionsForTopic(topic)
        val numPartitions = partitionInfo.size()
    
        if (key.isInstanceOf[Int]) {
          abs(key.toString().toInt) % numPartitions
        }
    
        key.hashCode() % numPartitions
    
      }
    
      override def close() = {
    
      }
    
      override def configure(configs: util.Map[String, _]) = {
    
      }
    }
    
    import java.util.Properties
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
    
    object KafkaProducer {
      def main(args: Array[String]): Unit = {
    
        val brokers = "192.168.1.151:9092,192.168.1.152:9092,192.168.1.153:9092"
        //    val brokers = "192.168.1.151:9092"
        val topic = "ScalaTopic";
    
        val props = new Properties()
        props.put("bootstrap.servers", brokers)
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    //    props.put("partitioner.class", classOf[SelfHashPartitioner].getName)
        props.put("partitioner.class", classOf[SelfRoundRobinPartitioner].getName)
        props.put("producer.type", "sync")
        props.put("batch.size", "1")
        props.put("acks", "all")
    
        val producer = new KafkaProducer[String, String](props);
    
        val sleepFlag = false;
        val message1 = new ProducerRecord[String, String](topic, "1", "test 1aa");
        producer.send(message1);
        if (sleepFlag) Thread.sleep(5000);
        val message2 = new ProducerRecord[String, String](topic, "1", "test 1bb");
        producer.send(message2);
        if (sleepFlag) Thread.sleep(5000);
        val message3 = new ProducerRecord[String, String](topic, "1", "test 1cc");
        producer.send(message3);
        if (sleepFlag) Thread.sleep(5000);
        val message4 = new ProducerRecord[String, String](topic, "4", "test 4dd");
        producer.send(message4);
        if (sleepFlag) Thread.sleep(5000);
        val message5 = new ProducerRecord[String, String](topic, "4", "test 4aa");
        producer.send(message5);
        if (sleepFlag) Thread.sleep(5000);
        val message6 = new ProducerRecord[String, String](topic, "3", "test 3bb");
        producer.send(message6);
        if (sleepFlag) Thread.sleep(5000);
        val message7 = new ProducerRecord[String, String](topic, "2", "test 2bb");
        producer.send(message7);
        if (sleepFlag) Thread.sleep(5000);
        producer.close()
      }
    }
    
    import java.lang
    import java.util.Properties
    
    import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer}
    import scala.collection.JavaConversions._
    
    object KafkaTConsumer {
      def main(args: Array[String]): Unit = {
        var groupid = "ScalaGroup"
        var consumerid = "ScalaConsumer"
        var topic = "ScalaTopic"
    
        //args match {
        //      case Array(arg1, arg2, arg3) => topic = arg1; groupid = arg2; consumerid = arg3
        //}
    
        val props = new Properties()
        props.put("bootstrap.servers", "192.168.1.151:9092,192.168.1.152:9092,192.168.1.153:9092")
        props.put("group.id", groupid)
        props.put("client.id", "test")
        props.put("consumer.id", consumerid)
        //    props.put("auto.offset.reset", "smallest")
        props.put("enable.auto.commit", "true")
        props.put("auto.commit.interval.ms", "100")
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    
    
        val consumer = new KafkaConsumer[String, String](props)
        consumer.subscribe(java.util.Arrays.asList(topic))
    
        while (true) {
          val records: ConsumerRecords[String, String] = consumer.poll(100)
          for (record <- records) {
            println(s"Topic = ${record.topic()}, partition = ${record.partition()}, key = ${record.key()}, value = ${record.value()}")
          }
        }
    
      }
    }
    

    Roud robin运行结果:

    Topic = ScalaTopic, partition = 0, key = 1, value = test 1cc
    Topic = ScalaTopic, partition = 0, key = 3, value = test 3bb
    Topic = ScalaTopic, partition = 1, key = 1, value = test 1aa
    Topic = ScalaTopic, partition = 1, key = 4, value = test 4dd
    Topic = ScalaTopic, partition = 2, key = 1, value = test 1bb
    Topic = ScalaTopic, partition = 2, key = 4, value = test 4aa
    Topic = ScalaTopic, partition = 1, key = 2, value = test 2bb

     Hash 运行结果:

    Topic = ScalaTopic, partition = 1, key = 1, value = test 1aa
    Topic = ScalaTopic, partition = 1, key = 1, value = test 1bb
    Topic = ScalaTopic, partition = 0, key = 3, value = test 3bb
    Topic = ScalaTopic, partition = 2, key = 2, value = test 2bb
    Topic = ScalaTopic, partition = 1, key = 1, value = test 1cc
    Topic = ScalaTopic, partition = 1, key = 4, value = test 4dd
    Topic = ScalaTopic, partition = 1, key = 4, value = test 4aa

  • 相关阅读:
    mysql主从配置
    apache+mysql+php,安装整合配置。
    [转载]误将SELINUXTYPE看成SELINUX后,将其值改为disabled。导致操作系统服务启动,无法进入单用户模式
    python-趣味百题3
    python-趣味百题2
    python-趣味百题1
    python之路----1
    控件的textIsSelectable属性引起的血案
    导入eclipse项目 编码格式错误,运行包 不能映射的...编码
    Glide 缓存使用
  • 原文地址:https://www.cnblogs.com/AK47Sonic/p/8097890.html
Copyright © 2011-2022 走看看