zoukankan      html  css  js  c++  java
  • 将数据写到kafka的topic

    package test05

    import java.util.Properties
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}

    object WriteDataToKafka {
    def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("ReadS3LogToKafka").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val logData:RDD[String] = sc.textFile("/Users/huiliyang/vwlog/")

    //logData.collect().foreach(println(_))

    writeToKafkaTopic(logData,"192.168.1.112:9092","huiliyang")

    }

    //写入数据到Kafka
    def writeToKafkaTopic(lines: RDD[String], kafkaServer: String, kafkaTopic: String): Unit ={
    val props = new Properties()
    props.put("bootstrap.servers", kafkaServer)
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")


    for (line <- lines) {
    val producer = new KafkaProducer[String, String](props)
    val record = new ProducerRecord(kafkaTopic, "key", line)
    producer.send(record)
    //Thread.sleep(10000)
    producer.close()
    }
    }
    }
  • 相关阅读:
    centOS7虚拟机上搭建kvm虚拟平台
    wxpython绘制折线图
    使用Mongodb爬取中国大学排名并写入数据库
    第一个爬虫与测试
    排球比赛规则的程序化
    文件的学习
    科学计算与可视化
    面对对象的学习
    对matplotlib库的运用
    PIL成就你的自信之路
  • 原文地址:https://www.cnblogs.com/yhl-yh/p/7741564.html
Copyright © 2011-2022 走看看