zoukankan      html  css  js  c++  java
  • spark-kafka-es交互

    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.spark.streaming.Seconds
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.kafka010._
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.SparkConf
    //import org.elasticsearch._
    import com.alibaba.fastjson.JSONObject
    import com.alibaba.fastjson.JSON._
    import java.text.SimpleDateFormat
    import org.elasticsearch.spark.rdd.EsSpark
    import org.apache.kafka.common.TopicPartition
    
    object stu_course_test {
    
        def tranTimeToLong(tm:String) :Long={
           val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
           val dt = fm.parse(tm)
           val aa = fm.format(dt)
           val tim: Long = dt.getTime()/1000
           tim
        }
        def main(args:Array[String]){
            val conf = new SparkConf().setAppName("stu_live_test5").set("es.nodes",ip).set("es.port","9200")
            val ssc = new StreamingContext(conf, Seconds(2))
            println("hello")
            val kafkaParams = Map[String, Object](
                "bootstrap.servers" -> ip,
                "group.id" -> "test_kafka1106",
                "key.deserializer" -> classOf[StringDeserializer],
                "value.deserializer" -> classOf[StringDeserializer],
                "sasl.plain.username" -> usrname,
                "sasl.plain.password" -> psw,
                "security.protocol" -> "SASL_PLAINTEXT",
                "sasl.mechanism" -> "PLAIN"
            //    "auto.offset.reset" -> "earliest",
              //  "enable.auto.commit" -> (false: java.lang.Boolean)
    
            );
            val tops = "topic_name"
            val topics = tops.split(",").toSet//     set offset 
            val fromOffsets = Map[TopicPartition, Long](new TopicPartition(tops,0) -> 20385338L).toMap
            val stream = KafkaUtils.createDirectStream[String, String](
                ssc,
                PreferConsistent,
                ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets));
            println("****************************9999");
            val lines = stream.map(record => record.value)
            val offs = stream.map(off => off.offset)
            offs.print()
            lines.print()
            lines.foreachRDD(record=>{
                 val datas = record.collect()
                 val count = record.count()
                 if (count>0){
                     for (i <- datas){
                        val dict = parseObject(parseObject(i).get("data").toString)
                        val stu_data = new JSONObject()
                        stu_data.put("a",dict.get("a").toString.toInt)
                        stu_data.put("b",dict.get("b").toString.toInt)
                        stu_data.put("c",dict.get("c").toString)
                        stu_data.put("d",dict.get("d").toString.toInt)
                        stu_data.put("time",tranTimeToLong(dict.get("time").toString).toInt)
                        stu_data.put("e",dict.get("e").toString.toInt)
                        val query = """{"query":{"bool":{"must":[{"term":{"key":"""+stu_data.get("keyid").toString+"""}},{"term":{"status":2}}]}}}"""
                        println(query)
                        val es_result = EsSpark.esRDD(ssc.sparkContext,"index_name/all-type",query)
                        println(es_result)
                        es_result.collect().foreach(course =>{
                            stu_data.put("aa",course._2("aa").toString)
                            stu_data.put("bb",course._2("bb").toString)
                            stu_data.put("cc",course._2("cc").toString.toInt)
                            val _id = stu_data.get("aa").toString+"_"+stu_data.get("bb")+"_"+stu_data.get("cc").toString
                            stu_data.put("_id",_id)
                            val stu_data_js = stu_data.toString
                            val rdd = ssc.sparkContext.makeRDD(Seq(stu_data_js))
                            EsSpark.saveJsonToEs(rdd,"test_index_name/docs",Map("es.mapping.id" -> "_id"))
                        })
                     }
                 }
            })
            println("dfsdfsdf");
            ssc.start();
            ssc.awaitTermination();
    
        }
    }
  • 相关阅读:
    如何安装树莓派虚拟机
    树莓派3用create_ap变身无线AP
    Centos 安装golang beego
    Lua中调用C函数
    C++ 用libcurl库进行http通讯网络编程(转)
    SkipList 跳表
    ntohs, ntohl, htons,htonl的比较和详解【转】
    SQLITE3 使用总结(转)
    mysql基础认识1
    mysql 数据类型
  • 原文地址:https://www.cnblogs.com/supermanwx/p/9959723.html
Copyright © 2011-2022 走看看