zoukankan      html  css  js  c++  java
  • kafka生产数据,消费数据

    //生产数据
    object
    ProducterDemo2 { def main(args: Array[String]): Unit = { val props = new Properties() props.put("bootstrap.servers","server3:6667") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") DicInitializer.init() val kp = new KafkaProducer[String,String](props) while (true){ val kvs = StdIn.readLine().split(",") //key用来确定分区,而value是数据。 kp.send(new ProducerRecord("xuyi",kvs(0),kvs(1))) } } }
    //消费数据
    package kafka import java.time.Duration import java.util import java.util.Properties import org.apache.kafka.clients.consumer.KafkaConsumer
    /** * kafka消费者API */ object Consumer { def main(args: Array[String]): Unit = { val props = new Properties() props.put("bootstrap.servers", "server3:6667") props.put("group.id", "myid1") props.put("auto.offset.reset", "earliest") props.put("enable.auto.commit", "true") props.put("auto.commit.interval.ms", "1000") props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") val consumer = new KafkaConsumer[String, String](props) consumer.subscribe(util.Arrays.asList("xuyi")) while (true) { val records = consumer.poll(Duration.ofMillis(100)) val ri = records.iterator() while (ri.hasNext) { val record = ri.next() println("key:", record.key(), "value:", record.value(), "partition:", record.partition(), "offset:", record.offset()) println() } } } }
  • 相关阅读:
    C++语法注意点
    T4+VSIX 打造Visual Studio 2010 中的Entity代码生成自定义工具
    如何构建安全的.net web应用系统
    在VS2010项目中引用Lib静态库(以Openssl为例)
    通过Handler实现ASP.NET WebForm自定义控件局部刷新
    一个简单的AOP框架学习
    sql 学习
    SQL Server 2005新特性之使用with关键字
    Memcache的资料
    Net通用基础框架学习
  • 原文地址:https://www.cnblogs.com/shiji7/p/12132540.html
Copyright © 2011-2022 走看看