zoukankan      html  css  js  c++  java
  • Kafka 学习笔记之 Producer/Consumer (Scala)

    既然Kafka使用Scala写的,最近也在慢慢学习Scala的语法,虽然还比较生疏,但是还是想尝试下用Scala实现Producer和Consumer,并且用HashPartitioner实现消息根据key路由到指定的partition。

    Producer:

    import java.util.Properties
    import kafka.producer.ProducerConfig
    import kafka.producer.Producer
    import kafka.producer.KeyedMessage
    
    
    object ProducerDemo {
      def main(args: Array[String]): Unit = {
        
        val brokers = "192.168.1.151:9092,192.168.1.152:9092,192.168.1.153:9092"
        val topic = "ScalaTopic";
        
        val props = new Properties()
        props.put("metadata.broker.list", brokers)
        props.put("serializer.class", "kafka.serializer.StringEncoder")
        props.put("partitioner.class", classOf[HashPartitioner].getName)
        props.put("producer.type", "sync")
        props.put("batch.num.messages", "1")
        props.put("queue.buffering.max.messages", "1000000")
        props.put("queue.enqueue.timeout.ms", "20000000")
    
        val config = new ProducerConfig(props)
        val producer = new Producer[String, String](config);
        
        val sleepFlag = false;
        val message1 = new KeyedMessage[String, String](topic, "1", "test 0");
        producer.send(message1);
        if(sleepFlag) Thread.sleep(5000);
        val message2 = new KeyedMessage[String, String](topic, "1", "test 1");
        producer.send(message2);
        if(sleepFlag) Thread.sleep(5000);
        val message3 = new KeyedMessage[String, String](topic, "1", "test 2");
        producer.send(message3);
        if(sleepFlag) Thread.sleep(5000);
        val message4 = new KeyedMessage[String, String](topic, "4", "test 3");
        producer.send(message4);
        if(sleepFlag) Thread.sleep(5000);
        val message5 = new KeyedMessage[String, String](topic, "4", "test 4");
        producer.send(message5);
        if(sleepFlag) Thread.sleep(5000);
        val message6 = new KeyedMessage[String, String](topic, "4", "test 4");
        producer.send(message6);
        if(sleepFlag) Thread.sleep(5000);
        
        
      }
    }
    

    Consumer:

    import java.util.Properties
    import kafka.consumer.ConsumerConfig
    import kafka.consumer.Consumer
    import kafka.message.MessageAndMetadata
    
    object ConsumerDemo {
      def main(args: Array[String]): Unit = {
        var groupid = ""
        var consumerid = ""
        var topic = ""
    
        args match {
          case Array(arg1, arg2, arg3) => topic = arg1; groupid = arg2; consumerid = arg3
        }
    
        val props = new Properties()
        props.put("zookeeper.connect", "192.168.1.151:2181,192.168.1.152:2181,192.168.1.153:2181")
        props.put("group.id", groupid)
        props.put("client.id", "test")
        props.put("consumer.id", consumerid)
        props.put("auto.offset.reset", "smallest")
        props.put("auto.commit.enable", "true")
        props.put("auto.commit.interval.ms", "100")
    
        val consumerConfig = new ConsumerConfig(props)
        val consumer = Consumer.create(consumerConfig)
    
        val topicCountMap = Map(topic -> 1)
        val consumerMap = consumer.createMessageStreams(topicCountMap)
        val streams = consumerMap.get(topic).get
        for (stream <- streams) {
          val it = stream.iterator()
    
          while (it.hasNext()) {
            val messageAndMetadata = it.next()
    
            val message = s"Topic:${messageAndMetadata.topic}, GroupID:$groupid, Consumer ID:$consumerid, PartitionID:${messageAndMetadata.partition}, " +
              s"Offset:${messageAndMetadata.offset}, Message Key:${new String(messageAndMetadata.key())}, Message Payload: ${new String(messageAndMetadata.message())}"
    
            System.out.println(message);
    
          }
    
        }
    
      }
    
    }
    

    HashPartitioner:

    import kafka.producer.Partitioner
    import scala.math._
    import kafka.utils.VerifiableProperties
    
    class HashPartitioner extends Partitioner {
      def this(verifiableProperties: VerifiableProperties) { this }
    
      override def partition(key: Any, numPartitions: Int): Int = {
    
        if (key.isInstanceOf[Int]) {
          abs(key.toString().toInt) % numPartitions
        }
    
        key.hashCode() % numPartitions
      }
    
    }
    

    运行结果:

     所有消息都被路由到了Partition1,测试成功!

  • 相关阅读:
    【人生】未来一段时间的规划
    java new一个对象的过程中发生了什么
    Openwrt missing dependencies for the following libraries:nf_nat.ko
    Lua日期转秒 时间函数os.time()和日期函数os.date()的使用
    lua 命令行参数
    Robot Framework自动化测试Telnet简单示例使用
    VirtualBox安装OpenWrt虚拟机
    C语言中负数的补码存储(1000 0000 表示128)
    Robot Framework自动化测试SSHLibrary简单示例使用
    dkjson实现lua空table编码为数组[]
  • 原文地址:https://www.cnblogs.com/AK47Sonic/p/7260577.html
Copyright © 2011-2022 走看看