zoukankan      html  css  js  c++  java
  • 使用sparksql往kafka推送数据

    一、相关配置参数

    1.同级目录resource文件夹下配置

    brokers_list=kafkaxxx02broker01:9092,kafkaxxx02broker02:9092,kafkaxxx02broker03:9092

    2.topic:

    last_member_info

    3.流程

    从hive表中读取相关字段,封装成json格式,抛kafka

    二、相关代码(scala)

    package kafka
    
    import java.io.InputStream
    import java.text.SimpleDateFormat
    import java.util.{Date, HashMap, Properties}
    
    import com.google.gson.JsonObject
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    
    object DakaMemProducer {
      val prop = new Properties()
      val is: InputStream = this.getClass().getResourceAsStream("/conf.properties")
      prop.load(is)
      val environment_broker_list = "brokers_list";
      private val brokers = prop.getProperty(environment_broker_list)
      // Zookeeper connection properties
      private val props = new HashMap[String, Object]()
      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringSerializer")
      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringSerializer")
      private val producer = new KafkaProducer[String, String](this.props)
    
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("DakaMemProducer")
        val spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
        val date = new Date(new Date().getTime - 86400000L)
        val dateFormat = new SimpleDateFormat("yyyyMMdd")
        val statisDate = dateFormat.format(date)
        val querySql1 = "select member_id,flag,nick_name,nick_type from xxx_db.xxx_table where statis_date = " + statisDate  // 离线数据
        val resultDF1 = spark.sql(querySql1)
        resultDF1.rdd.foreach(row => {
          val member_id: String = row.getAs[String]("member_id").toString()
          val flag: String = row.getAs[String]("flag").toString()
          val nick_name: String = row.getAs[String]("nick_name").toString()
          val nick_type: String = row.getAs[String]("nick_type").toString()
          val json = new JsonObject()
          json.addProperty("memberId", member_id).toString
          json.addProperty("flag", flag).toString
          json.addProperty("nickName", nick_name).toString
          json.addProperty("nickType", nick_type).toString
          kafkaProducerSend(json.toString)
        })
    
        if(!environment_broker_list.contains("prd")){
          resultDF1.show(100)
        }
        def kafkaProducerSend(args: String) {
          if (args != null) {
            val topic = "last_member_info"
            val message = new ProducerRecord[String, String](topic, null, args)
            producer.send(message)
          }
        }
      }
    }
  • 相关阅读:
    MVC 5 Scaffolder + EntityFramework+UnitOfWork Pattern 代码生成工具集成Visual Studio 2013
    Asp.Net MVC +EntityFramework主从表新增编辑操作的实现(删除操作怎么实现?)
    asp.net MVC 5 Scaffolding多层架构代码生成向导开源项目(邀请你的参与)
    Asp.net mvc 5 CRUD代码自动生成工具- vs.net 2013 Saffolding功能扩展
    Asp.net webform scaffolding结合Generic Unit of Work & (Extensible) Repositories Framework代码生成向导
    MVC中的默认Model绑定者DefaultModelBinder
    MVC中Action参数绑定的过程
    MVC中Action的执行过程
    MVC的控制器的激活过程,我们从MvcHandler开始讲,前面的事情以后再讲
    d
  • 原文地址:https://www.cnblogs.com/yin-fei/p/10748505.html
Copyright © 2011-2022 走看看