zoukankan      html  css  js  c++  java
  • kafka scala API

    scala kafka api test

    依赖:

      <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.11</artifactId>
                <version>1.1.0</version>
            </dependency>

     base KafkaAPI  test

    import java.util
    import java.util.{Collections, Properties}
    import kafka.message.MessageAndMetadata
    import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer}
    import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
    import org.junit.Test
    import scala.actors.threadpool.{ExecutorService, Executors}
    import scala.collection.mutable
    
    object KakfaDemo {
      def main(args: Array[String]): Unit =
      {
        val properties = new Properties
        properties.put("bootstrap.servers","192.168.3.70:9092")
        properties.put("acks","0")
        properties.put("retries","3")
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
        //create new producer
    //    val producer: Producer[String, String] = new Producer[String,String](kafkaConfig)
         val producer = new KafkaProducer[String,String](properties)
        for(i <- 1 to 100)
        {
          var msg=s"${i}  this is test"
    //      producer.send(new KeyedMessage[String,String]("test1",msg))
          producer.send(new ProducerRecord[String,String]("test1",msg))
        }
    
      }
    
    }
    
    
    object  KafkaTest{
    
      def main(args: Array[String]): Unit = {
        val props = new Properties();
        props.put("bootstrap.servers", "192.168.3.70:9092")
        props.put("group.id", "test123")
        props.put("enable.auto.commit", "true")
        props.put("auto.offset.reset", "earliest")
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
       val consumer = new KafkaConsumer[String, String](props)
        consumer.subscribe(Collections.singletonList("test1"))
        while(true)
        {
          val records = consumer.poll(1000);
          val record = records.iterator()
          while (record.hasNext)
          {
            val msg = record.next()
            System.out.println("offset=" + msg.offset() + ",--key=" + msg.key() + ",--value=" + msg.value())
            print("offset=" + msg.offset() + ",--key=" + msg.key() + ",--value=" + msg.value())
          }
          }
        }
    }
  • 相关阅读:
    利用UltraScale和UltraScale+FPGA和MPSOC加速DSP设计生产力
    ARM系列处理器和架构
    Thumb扩展
    使用Redis分布式锁处理并发,解决超卖问题
    idea指定启动参数、环境变量
    Json返回结果为null属性不显示解决
    Spring Cloud Zuul 网关服务的fallback
    记录一次URL中有特殊字符怎么处理?
    logback的使用和logback.xml详解
    通过gitlab的webhook触发Jenkins自动构建设置
  • 原文地址:https://www.cnblogs.com/lshan/p/13984327.html
Copyright © 2011-2022 走看看