zoukankan      html  css  js  c++  java
  • scala_spark实践2

    参考:jianshu.com/p/9d2d225c1951 

    监听socket获取数据,代码如下:
    这里使用nc -lk 9999 在ip为10.121.33.44的机器上发送消息

    object SocketStream {
      def main(args: Array[String]): Unit = {
        //本地测试,设置4核
        val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
        //以10秒为一个批次
        val ssc = new StreamingContext(conf,Seconds(10))
        //接收消息
        val dstream = ssc.socketTextStream("10.121.33.44",9999,StorageLevel.MEMORY_AND_DISK_SER)
        //监测关键字error,出现则print
        dstream.filter(_.contains("error")).foreachRDD(rdd=>{
          rdd.foreach(println(_))
        })
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

      从kafka读取数据,比较常用

    object KafkaStream {
    
      def main(args: Array[String]): Unit = {
        //本地测试,设置4核
        val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
        //以10秒为一个批次
        val ssc = new StreamingContext(conf,Seconds(10))
    
        val zkQuorum = "10.22.33.44:6688,10.22.33.45:6688/kafka_cluster"
        val group_id = "realtime_data"
    
        //kafka相关参数
        val kafka_param = Map[String,String](
          "zookeeper.connect" ->zkQuorum,
          "group.id" -> group_id,
          "zookeeper.connection.timeout.ms" -> "10000",
          "fetch.message.max.bytes" -> "10485760"
        )
        val topic = Map[String,Int]("test_topic" -> 16)
        //接收消息
        val dstream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kafka_param,topic,StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
        //监测关键字error,出现则print
        dstream.filter(_.contains("error")).foreachRDD(rdd=>{
          rdd.foreach(println(_))
        })
        
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

      

  • 相关阅读:
    使用node.js搭建一个简单的后台服务
    node.js连接MySQL数据库
    js将date对象转换为指定格式
    配置angular2运行环境
    简单AJAX请求JSon数据
    正则表达式
    javascript typeof 和 instanceof 的区别和联系
    map和reduce函数的使用
    Github 上管理项目
    微服务资料
  • 原文地址:https://www.cnblogs.com/shaozhiqi/p/12172113.html
Copyright © 2011-2022 走看看