zoukankan      html  css  js  c++  java
  • spark-kafka-es交互 优化

    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.spark.streaming.Seconds
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.kafka010._
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.SparkConf
    //import org.elasticsearch._
    import com.alibaba.fastjson.JSONObject
    import com.alibaba.fastjson.JSON._
    import com.alibaba.fastjson.parser._
    import java.text.SimpleDateFormat
    import org.elasticsearch.spark.rdd.EsSpark
    import org.apache.kafka.common.TopicPartition
    import redis.clients.jedis._
    import scala.collection.JavaConverters._

    object stu_course_test1 {

    def tranTimeToLong(tm:String) :Long={
    val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    val dt = fm.parse(tm)
    val aa = fm.format(dt)
    val tim: Long = dt.getTime()/1000
    tim
    }
    def main(args:Array[String]){
    ParserConfig.getGlobalInstance().setAutoTypeSupport(true);
    val conf = new SparkConf().setAppName("test").set("es.nodes",ip_list).set("es.port","9200")
    val ssc = new StreamingContext(conf, Seconds(2))
    println("hello")
    val redis1 = new Jedis("10.10.66.163",6379)
    val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> ip,
    "group.id" -> "test",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "sasl.plain.username" -> "name",
    "sasl.plain.password" -> "psw",
    "security.protocol" -> "SASL_PLAINTEXT",
    "sasl.mechanism" -> "PLAIN",
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (true: java.lang.Boolean)

    );
    val tops = "stucourse_xes_student_courses"
    val topics = Array(tops)
    val redis_ans = redis1.hgetAll(tops).asScala
    var fromOffsets:Map[TopicPartition, Long] = Map()
    if (redis_ans.isEmpty != true){
    redis_ans.foreach{i => {fromOffsets+=(new TopicPartition(tops,i._1.toInt) -> i._2.toLong)}}
    }
    fromOffsets = Map[TopicPartition, Long](new TopicPartition(tops,0) -> 20900000L).toMap
    //redis有值
    val stream = if(fromOffsets.isEmpty != true){
    KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent,
    Subscribe[String, String](topics, kafkaParams, fromOffsets));//fromOffsets.keys.toList
    }
    else{
    KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent,
    Subscribe[String, String](topics, kafkaParams))
    }
    val origin = stream.map(record=>{
    var new_record = new JSONObject()

    new_record.put("offset",record.offset.toString.toLong);
    new_record.put("partition",record.partition.toString.toInt);
    new_record.put("value",record.value)
    new_record
    })
    val offs = stream.map(off => off.offset)
    origin.foreachRDD(record=>{
    val count = record.count()
    var data_bulk: List[String] = List()
    var course_list:List[Any] = List()
    println(count)
    val record_ans = record.saveAsTextFile("test_log")
    println(record_ans)
    if (count>0){
    for (i <- record.collect()){
    val datas = parseObject(i.toString)

    val offset = datas.get("offset")
    val partition = datas.get("partition")
    val dict = parseObject(parseObject(datas.get("value").toString).get("data").toString)
    // println(offset,partition)
    val stu_data = new JSONObject()
    stu_data.put("a",dict.get("a").toString.toInt)
    stu_data.put("b",dict.get("b").toString.toInt)
    if (course_list.exists(tmp=>tmp==stu_data.get("a")) == false){
    course_list = course_list:+stu_data.get("a")
    }
    if (dict.get("a").toString.toInt > 100000){

    stu_data.put("a",dict.get("a").toString)
    stu_data.put("b",dict.get("b").toString.toInt)
    stu_data.put("c",tranTimeToLong(dict.get("c").toString).toInt)
    stu_data.put("d",dict.get("d").toString)
    stu_data.put("e",dict.get("e").toString)
    stu_data.put("d","")
    stu_data.put("d","")
    stu_data.put("f",0)
    stu_data.put("modify_time",System.currentTimeMillis/1000)
    stu_data.put("r",dict.get("r").toString.toInt)
    stu_data.put("offset",offset)
    stu_data.put("partition",partition)
    stu_data.put("_id",stu_data.get("a").toString+"_"+stu_data.get("a")+"_"+stu_data.get("d").toString)
    data_bulk = data_bulk:+stu_data.toString
    }
    }
    val course_data = new JSONObject()
    val course_num = course_list.length
    var course_cnt = ((course_num*1.0)/1000.0).ceil.toInt
    if(course_cnt == 0 && course_num>0){
    course_cnt = 1
    }
    for (i <- 0 to course_cnt){
    var gap = 0
    if (course_list.length > 1000){
    gap = 1000
    }
    else{
    gap = course_list.length
    }
    var coursestr = course_list.take(gap).toString()
    course_list = course_list.takeRight(course_list.length - gap)

    coursestr = coursestr.substring(5,coursestr.length-1)
    if(coursestr.length > 0){
    val query = """{"query":{"bool":{"must":[{"terms":{"course_id":["""+coursestr+"""]}}]}}}"""
    println(query)
    val es_result = EsSpark.esRDD(ssc.sparkContext,"index/all-type",query)
    es_result.collect().foreach(course => {
    var detail_set = new JSONObject()
    detail_set.put("a",course._2("a").toString)
    detail_set.put("b",course._2("b").toString)
    detail_set.put("c",course._2("c").toString.toInt)
    detail_set.put("c",course._2("c").toString.toInt)
    detail_set.put("c",course._2("c").toString.toInt)
    detail_set.put("c",course._2("c").toString.toInt)
    detail_set.put("d",course._2("d").toString.toInt)
    course_data.put(course._1.toString,detail_set)
    })
    }
    }
    var data_seq:Seq[String] = Seq()
    var data_cnt = 0
    if (data_bulk.length > 0){
    var offset_list:Map[String,String] = Map()
    for(data<-data_bulk){
    val datastr = data.toString
    var data_set = parseObject(datastr)
    offset_list += (data_set.get("partition").toString->data_set.get("offset").toString)
    data_set.remove("offset")
    data_set.remove("partition")
    if (course_data.containsKey(data_set.get("course_id").toString)){
    var course_set = course_data.get(data_set.get("course_id").toString).toString
    var all_data = datastr.toString.substring(0,datastr.length-1)+","+course_set.substring(1,course_set.length)
    data_cnt += 1
    data_seq = data_seq :+ all_data
    if (data_cnt == 100){

    val rdd = ssc.sparkContext.makeRDD(data_seq)
    val up_ans = EsSpark.saveJsonToEs(rdd,"test_index/docs",Map("es.mapping.id" -> "_id"))
    println("up_ans:",up_ans)
    data_cnt = 0
    data_seq = Nil
    }
    }
    }
    if (data_cnt >0){
    val rdd = ssc.sparkContext.makeRDD(data_seq)
    val up_ans = EsSpark.saveJsonToEs(rdd,"test_index/docs",Map("es.mapping.id" -> "_id"))
    println("up_ans",up_ans)
    data_cnt = 0
    data_seq = Nil

    }

    if (offset_list.isEmpty != true){
    val up_ans = redis1.hmset(tops,offset_list.asJava)
    println(up_ans)
    redis1.close
    val redis_ans = redis1.hgetAll(tops)
    println(redis_ans)
    println(redis_ans.getClass.getSimpleName)
    }

    }
    }
    data_bulk = Nil
    course_list = Nil

    })
    ssc.start();
    ssc.awaitTermination();

    }
    }

  • 相关阅读:
    数据中台的“自动化数据治理”时代已来
    如何利用缓存机制实现JAVA类反射性能提升30倍
    快速入门开发实现订单类图片识别结果抽象解析
    Nginx专题(1):Nginx之反向代理及配置
    Github 上热门的 Spring Boot 项目实战推荐
    设计模式之命令模式(二)
    设计模式之命令模式(一)
    设计模式之单例模式(二)
    设计模式之单例模式(一)
    好的学习带给我什么
  • 原文地址:https://www.cnblogs.com/supermanwx/p/10186744.html
Copyright © 2011-2022 走看看