参考:jianshu.com/p/9d2d225c1951
监听socket获取数据,代码如下:
这里使用nc -lk 9999 在ip为10.121.33.44的机器上发送消息
object SocketStream { def main(args: Array[String]): Unit = { //本地测试,设置4核 val conf = new SparkConf().setMaster("local[4]").setAppName("streaming") //以10秒为一个批次 val ssc = new StreamingContext(conf,Seconds(10)) //接收消息 val dstream = ssc.socketTextStream("10.121.33.44",9999,StorageLevel.MEMORY_AND_DISK_SER) //监测关键字error,出现则print dstream.filter(_.contains("error")).foreachRDD(rdd=>{ rdd.foreach(println(_)) }) ssc.start() ssc.awaitTermination() } }
从kafka读取数据,比较常用
object KafkaStream { def main(args: Array[String]): Unit = { //本地测试,设置4核 val conf = new SparkConf().setMaster("local[4]").setAppName("streaming") //以10秒为一个批次 val ssc = new StreamingContext(conf,Seconds(10)) val zkQuorum = "10.22.33.44:6688,10.22.33.45:6688/kafka_cluster" val group_id = "realtime_data" //kafka相关参数 val kafka_param = Map[String,String]( "zookeeper.connect" ->zkQuorum, "group.id" -> group_id, "zookeeper.connection.timeout.ms" -> "10000", "fetch.message.max.bytes" -> "10485760" ) val topic = Map[String,Int]("test_topic" -> 16) //接收消息 val dstream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kafka_param,topic,StorageLevel.MEMORY_AND_DISK_SER).map(_._2) //监测关键字error,出现则print dstream.filter(_.contains("error")).foreachRDD(rdd=>{ rdd.foreach(println(_)) }) ssc.start() ssc.awaitTermination() } }