zoukankan      html  css  js  c++  java
  • IDEA Spark Streaming Kafka数据源-Producer

    import java.util
    
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
    /**
      * Created by soyo on 17-10-17.
      * 运行kafka程序 1.需要启动Zookeeper服务:./bin/zookeeper-server-start.sh config/zookeeper.properties
      *  2.启动Kafka服务:./bin/kafka-server-start.sh config/server.properties
      *  3.执行 DStream_Kafa_Producer
      *  4.执行 DStream_Kafa_Consumer
      */
    object DStream_Kafa_Producer {
      def main(args: Array[String]): Unit = {
        val brokers="localhost:9092"
        val topic="wordsender"
        val messagePerSec=5    //行数
        val wordsPerMessage=7  //列数
        //配置Zookeeper
        val props= new util.HashMap[String,Object]()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
    
        val producer=new KafkaProducer[String,String](props)
        while (true){
          (1 to messagePerSec).foreach({x=>
            val str=(1 to wordsPerMessage).map(x=>scala.util.Random.nextInt(10).toString).mkString(" ")
            println(str)
            val message=new ProducerRecord[String,String](topic,null,str)
            producer.send(message)
            val s2="ni hao 我的 测试 的 字符串"   //可以与message一起发送
            val message2=new ProducerRecord[String,String](topic,null,s2)
             producer.send(message2)
          })
          Thread.sleep(1000)
        }
      }
    }

    结果:

    4 6 9 7 6 8 3
    0 0 8 3 9 3 4
    2 2 1 9 2 2 3
    6 2 5 8 1 0 7
    6 9 6 8 5 8 0
    7 8 6 5 3 4 4
    3 7 9 1 3 1 9
    9 0 0 9 0 6 9
    2 5 2 8 3 6 5
    9 3 2 6 2 6 8
    2 1 2 7 2 7 3

    /**
    * Created by soyo on 17-10-17.
    * 运行kafka程序 1.需要启动Zookeeper服务:./bin/zookeeper-server-start.sh config/zookeeper.properties
    * 2.启动Kafka服务:./bin/kafka-server-start.sh config/server.properties
    * 3.执行 DStream_Kafa_Producer
    * 4.执行 DStream_Kafa_Consumer
    */
  • 相关阅读:
    堆排序
    jdk8 永久代变更
    oracle 区分大小写遇到的坑
    日志统计分析
    zookeeper 服务挂掉重启后,dubbo 服务是不会自动重新注册上的
    代码质量管理
    快速排序算法
    python flask 项目结构
    项目架构
    JS中的循环---最全的循环总结
  • 原文地址:https://www.cnblogs.com/soyo/p/7683387.html
Copyright © 2011-2022 走看看