zoukankan      html  css  js  c++  java
  • kafka生产数据,消费数据

    //生产数据
    object
    ProducterDemo2 { def main(args: Array[String]): Unit = { val props = new Properties() props.put("bootstrap.servers","server3:6667") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") DicInitializer.init() val kp = new KafkaProducer[String,String](props) while (true){ val kvs = StdIn.readLine().split(",") //key用来确定分区,而value是数据。 kp.send(new ProducerRecord("xuyi",kvs(0),kvs(1))) } } }
    //消费数据
    package kafka import java.time.Duration import java.util import java.util.Properties import org.apache.kafka.clients.consumer.KafkaConsumer
    /** * kafka消费者API */ object Consumer { def main(args: Array[String]): Unit = { val props = new Properties() props.put("bootstrap.servers", "server3:6667") props.put("group.id", "myid1") props.put("auto.offset.reset", "earliest") props.put("enable.auto.commit", "true") props.put("auto.commit.interval.ms", "1000") props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") val consumer = new KafkaConsumer[String, String](props) consumer.subscribe(util.Arrays.asList("xuyi")) while (true) { val records = consumer.poll(Duration.ofMillis(100)) val ri = records.iterator() while (ri.hasNext) { val record = ri.next() println("key:", record.key(), "value:", record.value(), "partition:", record.partition(), "offset:", record.offset()) println() } } } }
  • 相关阅读:
    二叉搜索树
    【树】List Leaves
    模板——dijkstra单源最短路
    余数求和——除法分块
    倍增——ST表
    线段树——内存池
    线段树——模板
    洛谷 P1498 南蛮图腾
    洛谷 P2199 最后的迷宫
    洛谷 P1495 中国剩余定理
  • 原文地址:https://www.cnblogs.com/shiji7/p/12132540.html
Copyright © 2011-2022 走看看