zoukankan      html  css  js  c++  java
  • Kafka 学习笔记之 Producer/Consumer (Scala)

    既然Kafka使用Scala写的,最近也在慢慢学习Scala的语法,虽然还比较生疏,但是还是想尝试下用Scala实现Producer和Consumer,并且用HashPartitioner实现消息根据key路由到指定的partition。

    Producer:

    import java.util.Properties
    import kafka.producer.ProducerConfig
    import kafka.producer.Producer
    import kafka.producer.KeyedMessage
    
    
    object ProducerDemo {
      def main(args: Array[String]): Unit = {
        
        val brokers = "192.168.1.151:9092,192.168.1.152:9092,192.168.1.153:9092"
        val topic = "ScalaTopic";
        
        val props = new Properties()
        props.put("metadata.broker.list", brokers)
        props.put("serializer.class", "kafka.serializer.StringEncoder")
        props.put("partitioner.class", classOf[HashPartitioner].getName)
        props.put("producer.type", "sync")
        props.put("batch.num.messages", "1")
        props.put("queue.buffering.max.messages", "1000000")
        props.put("queue.enqueue.timeout.ms", "20000000")
    
        val config = new ProducerConfig(props)
        val producer = new Producer[String, String](config);
        
        val sleepFlag = false;
        val message1 = new KeyedMessage[String, String](topic, "1", "test 0");
        producer.send(message1);
        if(sleepFlag) Thread.sleep(5000);
        val message2 = new KeyedMessage[String, String](topic, "1", "test 1");
        producer.send(message2);
        if(sleepFlag) Thread.sleep(5000);
        val message3 = new KeyedMessage[String, String](topic, "1", "test 2");
        producer.send(message3);
        if(sleepFlag) Thread.sleep(5000);
        val message4 = new KeyedMessage[String, String](topic, "4", "test 3");
        producer.send(message4);
        if(sleepFlag) Thread.sleep(5000);
        val message5 = new KeyedMessage[String, String](topic, "4", "test 4");
        producer.send(message5);
        if(sleepFlag) Thread.sleep(5000);
        val message6 = new KeyedMessage[String, String](topic, "4", "test 4");
        producer.send(message6);
        if(sleepFlag) Thread.sleep(5000);
        
        
      }
    }
    

    Consumer:

    import java.util.Properties
    import kafka.consumer.ConsumerConfig
    import kafka.consumer.Consumer
    import kafka.message.MessageAndMetadata
    
    object ConsumerDemo {
      def main(args: Array[String]): Unit = {
        var groupid = ""
        var consumerid = ""
        var topic = ""
    
        args match {
          case Array(arg1, arg2, arg3) => topic = arg1; groupid = arg2; consumerid = arg3
        }
    
        val props = new Properties()
        props.put("zookeeper.connect", "192.168.1.151:2181,192.168.1.152:2181,192.168.1.153:2181")
        props.put("group.id", groupid)
        props.put("client.id", "test")
        props.put("consumer.id", consumerid)
        props.put("auto.offset.reset", "smallest")
        props.put("auto.commit.enable", "true")
        props.put("auto.commit.interval.ms", "100")
    
        val consumerConfig = new ConsumerConfig(props)
        val consumer = Consumer.create(consumerConfig)
    
        val topicCountMap = Map(topic -> 1)
        val consumerMap = consumer.createMessageStreams(topicCountMap)
        val streams = consumerMap.get(topic).get
        for (stream <- streams) {
          val it = stream.iterator()
    
          while (it.hasNext()) {
            val messageAndMetadata = it.next()
    
            val message = s"Topic:${messageAndMetadata.topic}, GroupID:$groupid, Consumer ID:$consumerid, PartitionID:${messageAndMetadata.partition}, " +
              s"Offset:${messageAndMetadata.offset}, Message Key:${new String(messageAndMetadata.key())}, Message Payload: ${new String(messageAndMetadata.message())}"
    
            System.out.println(message);
    
          }
    
        }
    
      }
    
    }
    

    HashPartitioner:

    import kafka.producer.Partitioner
    import scala.math._
    import kafka.utils.VerifiableProperties
    
    class HashPartitioner extends Partitioner {
      def this(verifiableProperties: VerifiableProperties) { this }
    
      override def partition(key: Any, numPartitions: Int): Int = {
    
        if (key.isInstanceOf[Int]) {
          abs(key.toString().toInt) % numPartitions
        }
    
        key.hashCode() % numPartitions
      }
    
    }
    

    运行结果:

     所有消息都被路由到了Partition1,测试成功!

  • 相关阅读:
    [译文] 实体与值对象到底是不是一回事?
    实现 WebApi 自托管服务宿主于 WinForms 及其交互
    [译文] C# 8 已成旧闻, 向前, 抵达 C# 9!
    [译文] 为什么你在 C# 里总是应该使用 "var" 关键字
    通过设置iis在局域网中访问网页
    windows 10 安装使用kafka
    ASP.NET Core 2.1 中的 HttpClientFactory (Part 4) 整合Polly实现瞬时故障处理
    ASP.NET Core 2.1 中的 HttpClientFactory (Part 3) 使用Handler实现传出请求中间件
    ASP.NET Core 2.1 中的 HttpClientFactory (Part 2) 定义命名化和类型化的客户端
    Asp.net Core 2.0 OpenId Connect Handler缺失Claims?
  • 原文地址:https://www.cnblogs.com/AK47Sonic/p/7260577.html
Copyright © 2011-2022 走看看