zoukankan      html  css  js  c++  java
  • streaming+kafka 数据来源mysql数据库

    package spark
    
    import java.util.Properties
    
    import java.util.HashMap
    import org.apache.kafka.clients.producer._
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.{ SparkContext, SparkConf }
    import spark.bean.orders
    
    object SelectFromOneTable {
      def main(args: Array[String]) {
        val Array(brokers, topic, wordsPerMessage) = Array("localhost:9092", "sun_test", "1")
        val props = new HashMap[String, Object]()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put("serializer.class", "kafka.serializer.StringEncoder")
        props.put("producer.type", "async")
    
        val producer = new KafkaProducer[String, String](props)
    
        val sparkConf = new SparkConf().setAppName("Spark SQL Test Case").setMaster("local")
        val sparkContext = new SparkContext(sparkConf)
    
        val sqlContext = new SQLContext(sparkContext)
        val url = "jdbc:mysql://localhost:3306/sun_test?user=root&password=Sun@123";
        val prop = new Properties();
        val df = sqlContext.read.jdbc(url, "flow", prop).collect()
    
        for (a <- df) {
          println(a)
          val message = new ProducerRecord[String, String](topic, null, a.toString())
          producer.send(message)
        }
      }
    }  
    
  • 相关阅读:
    System 类的使用
    StringBuffer 与 StringBuilder类的使用
    String 类 的 使用
    多线程
    音乐播放
    数据库
    表示图编辑
    UITextView(2)
    UITextView
    tarBar
  • 原文地址:https://www.cnblogs.com/sunyaxue/p/6547168.html
Copyright © 2011-2022 走看看