zoukankan      html  css  js  c++  java
  • kafka生产

    package two

    import java.util.HashMap

    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

    /**
    * Created by zhoucw on 上午2:13.
    */
    // Produces some random words between 1 and 100.
    object KafkaWordCountProducer {

    def main(args: Array[String]) {
    if (args.length < 4) {
    System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
    "<messagesPerSec> <wordsPerMessage>")
    System.exit(1)
    }

    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args

    // Zookeeper connection properties
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    // Send some messages
    while(true) {
    (1 to messagesPerSec.toInt).foreach { messageNum =>
    val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
    .mkString(" ")

    val message = new ProducerRecord[String, String](topic, null, str)
    producer.send(message)
    }

    Thread.sleep(1000)
    }
    }

    }

  • 相关阅读:
    Python 类的特性讲解
    Python 类的式列化过程解剖
    Python 面向对象介绍
    Python subprocess模块
    Python re模块
    Python configparser模块
    Python pyYAML模块
    Python logging模块
    Python hashlib模块
    OAuth2.0 错误码
  • 原文地址:https://www.cnblogs.com/heguoxiu/p/10149708.html
Copyright © 2011-2022 走看看