import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.Seconds
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.SparkConf
//import org.elasticsearch._
import com.alibaba.fastjson.JSONObject
import com.alibaba.fastjson.JSON._
import com.alibaba.fastjson.parser._
import java.text.SimpleDateFormat
import org.elasticsearch.spark.rdd.EsSpark
import org.apache.kafka.common.TopicPartition
import redis.clients.jedis._
import scala.collection.JavaConverters._
object stu_course_test1 {
def tranTimeToLong(tm:String) :Long={
val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val dt = fm.parse(tm)
val aa = fm.format(dt)
val tim: Long = dt.getTime()/1000
tim
}
def main(args:Array[String]){
ParserConfig.getGlobalInstance().setAutoTypeSupport(true);
val conf = new SparkConf().setAppName("test").set("es.nodes",ip_list).set("es.port","9200")
val ssc = new StreamingContext(conf, Seconds(2))
println("hello")
val redis1 = new Jedis("10.10.66.163",6379)
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> ip,
"group.id" -> "test",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"sasl.plain.username" -> "name",
"sasl.plain.password" -> "psw",
"security.protocol" -> "SASL_PLAINTEXT",
"sasl.mechanism" -> "PLAIN",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (true: java.lang.Boolean)
);
val tops = "stucourse_xes_student_courses"
val topics = Array(tops)
val redis_ans = redis1.hgetAll(tops).asScala
var fromOffsets:Map[TopicPartition, Long] = Map()
if (redis_ans.isEmpty != true){
redis_ans.foreach{i => {fromOffsets+=(new TopicPartition(tops,i._1.toInt) -> i._2.toLong)}}
}
fromOffsets = Map[TopicPartition, Long](new TopicPartition(tops,0) -> 20900000L).toMap
//redis有值
val stream = if(fromOffsets.isEmpty != true){
KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams, fromOffsets));//fromOffsets.keys.toList
}
else{
KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams))
}
val origin = stream.map(record=>{
var new_record = new JSONObject()
new_record.put("offset",record.offset.toString.toLong);
new_record.put("partition",record.partition.toString.toInt);
new_record.put("value",record.value)
new_record
})
val offs = stream.map(off => off.offset)
origin.foreachRDD(record=>{
val count = record.count()
var data_bulk: List[String] = List()
var course_list:List[Any] = List()
println(count)
val record_ans = record.saveAsTextFile("test_log")
println(record_ans)
if (count>0){
for (i <- record.collect()){
val datas = parseObject(i.toString)
val offset = datas.get("offset")
val partition = datas.get("partition")
val dict = parseObject(parseObject(datas.get("value").toString).get("data").toString)
// println(offset,partition)
val stu_data = new JSONObject()
stu_data.put("a",dict.get("a").toString.toInt)
stu_data.put("b",dict.get("b").toString.toInt)
if (course_list.exists(tmp=>tmp==stu_data.get("a")) == false){
course_list = course_list:+stu_data.get("a")
}
if (dict.get("a").toString.toInt > 100000){
stu_data.put("a",dict.get("a").toString)
stu_data.put("b",dict.get("b").toString.toInt)
stu_data.put("c",tranTimeToLong(dict.get("c").toString).toInt)
stu_data.put("d",dict.get("d").toString)
stu_data.put("e",dict.get("e").toString)
stu_data.put("d","")
stu_data.put("d","")
stu_data.put("f",0)
stu_data.put("modify_time",System.currentTimeMillis/1000)
stu_data.put("r",dict.get("r").toString.toInt)
stu_data.put("offset",offset)
stu_data.put("partition",partition)
stu_data.put("_id",stu_data.get("a").toString+"_"+stu_data.get("a")+"_"+stu_data.get("d").toString)
data_bulk = data_bulk:+stu_data.toString
}
}
val course_data = new JSONObject()
val course_num = course_list.length
var course_cnt = ((course_num*1.0)/1000.0).ceil.toInt
if(course_cnt == 0 && course_num>0){
course_cnt = 1
}
for (i <- 0 to course_cnt){
var gap = 0
if (course_list.length > 1000){
gap = 1000
}
else{
gap = course_list.length
}
var coursestr = course_list.take(gap).toString()
course_list = course_list.takeRight(course_list.length - gap)
coursestr = coursestr.substring(5,coursestr.length-1)
if(coursestr.length > 0){
val query = """{"query":{"bool":{"must":[{"terms":{"course_id":["""+coursestr+"""]}}]}}}"""
println(query)
val es_result = EsSpark.esRDD(ssc.sparkContext,"index/all-type",query)
es_result.collect().foreach(course => {
var detail_set = new JSONObject()
detail_set.put("a",course._2("a").toString)
detail_set.put("b",course._2("b").toString)
detail_set.put("c",course._2("c").toString.toInt)
detail_set.put("c",course._2("c").toString.toInt)
detail_set.put("c",course._2("c").toString.toInt)
detail_set.put("c",course._2("c").toString.toInt)
detail_set.put("d",course._2("d").toString.toInt)
course_data.put(course._1.toString,detail_set)
})
}
}
var data_seq:Seq[String] = Seq()
var data_cnt = 0
if (data_bulk.length > 0){
var offset_list:Map[String,String] = Map()
for(data<-data_bulk){
val datastr = data.toString
var data_set = parseObject(datastr)
offset_list += (data_set.get("partition").toString->data_set.get("offset").toString)
data_set.remove("offset")
data_set.remove("partition")
if (course_data.containsKey(data_set.get("course_id").toString)){
var course_set = course_data.get(data_set.get("course_id").toString).toString
var all_data = datastr.toString.substring(0,datastr.length-1)+","+course_set.substring(1,course_set.length)
data_cnt += 1
data_seq = data_seq :+ all_data
if (data_cnt == 100){
val rdd = ssc.sparkContext.makeRDD(data_seq)
val up_ans = EsSpark.saveJsonToEs(rdd,"test_index/docs",Map("es.mapping.id" -> "_id"))
println("up_ans:",up_ans)
data_cnt = 0
data_seq = Nil
}
}
}
if (data_cnt >0){
val rdd = ssc.sparkContext.makeRDD(data_seq)
val up_ans = EsSpark.saveJsonToEs(rdd,"test_index/docs",Map("es.mapping.id" -> "_id"))
println("up_ans",up_ans)
data_cnt = 0
data_seq = Nil
}
if (offset_list.isEmpty != true){
val up_ans = redis1.hmset(tops,offset_list.asJava)
println(up_ans)
redis1.close
val redis_ans = redis1.hgetAll(tops)
println(redis_ans)
println(redis_ans.getClass.getSimpleName)
}
}
}
data_bulk = Nil
course_list = Nil
})
ssc.start();
ssc.awaitTermination();
}
}