zoukankan      html  css  js  c++  java
  • streaming+kafka 数据来源mysql数据库

    package spark
    
    import java.util.Properties
    
    import java.util.HashMap
    import org.apache.kafka.clients.producer._
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.{ SparkContext, SparkConf }
    import spark.bean.orders
    
    object SelectFromOneTable {
      def main(args: Array[String]) {
        val Array(brokers, topic, wordsPerMessage) = Array("localhost:9092", "sun_test", "1")
        val props = new HashMap[String, Object]()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put("serializer.class", "kafka.serializer.StringEncoder")
        props.put("producer.type", "async")
    
        val producer = new KafkaProducer[String, String](props)
    
        val sparkConf = new SparkConf().setAppName("Spark SQL Test Case").setMaster("local")
        val sparkContext = new SparkContext(sparkConf)
    
        val sqlContext = new SQLContext(sparkContext)
        val url = "jdbc:mysql://localhost:3306/sun_test?user=root&password=Sun@123";
        val prop = new Properties();
        val df = sqlContext.read.jdbc(url, "flow", prop).collect()
    
        for (a <- df) {
          println(a)
          val message = new ProducerRecord[String, String](topic, null, a.toString())
          producer.send(message)
        }
      }
    }  
    
  • 相关阅读:
    js——正则整理
    纯css改变select默认样式
    CSS3——瀑布流,多列(Multi-column)
    angularjs + ionic 实现项目的按需加载
    jquery的deferred对象
    Nginx 配置反向代理
    docker部署vue项目总结
    模糊查询json数组
    LocalStorage存储JSON对象、存储数组
    iview中遇到table的坑(已经修改了table的数据,但是界面没有更新)
  • 原文地址:https://www.cnblogs.com/sunyaxue/p/6547168.html
Copyright © 2011-2022 走看看