zoukankan      html  css  js  c++  java
  • scala_spark实践2

    参考:jianshu.com/p/9d2d225c1951 

    监听socket获取数据,代码如下:
    这里使用nc -lk 9999 在ip为10.121.33.44的机器上发送消息

    object SocketStream {
      def main(args: Array[String]): Unit = {
        //本地测试,设置4核
        val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
        //以10秒为一个批次
        val ssc = new StreamingContext(conf,Seconds(10))
        //接收消息
        val dstream = ssc.socketTextStream("10.121.33.44",9999,StorageLevel.MEMORY_AND_DISK_SER)
        //监测关键字error,出现则print
        dstream.filter(_.contains("error")).foreachRDD(rdd=>{
          rdd.foreach(println(_))
        })
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

      从kafka读取数据,比较常用

    object KafkaStream {
    
      def main(args: Array[String]): Unit = {
        //本地测试,设置4核
        val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
        //以10秒为一个批次
        val ssc = new StreamingContext(conf,Seconds(10))
    
        val zkQuorum = "10.22.33.44:6688,10.22.33.45:6688/kafka_cluster"
        val group_id = "realtime_data"
    
        //kafka相关参数
        val kafka_param = Map[String,String](
          "zookeeper.connect" ->zkQuorum,
          "group.id" -> group_id,
          "zookeeper.connection.timeout.ms" -> "10000",
          "fetch.message.max.bytes" -> "10485760"
        )
        val topic = Map[String,Int]("test_topic" -> 16)
        //接收消息
        val dstream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kafka_param,topic,StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
        //监测关键字error,出现则print
        dstream.filter(_.contains("error")).foreachRDD(rdd=>{
          rdd.foreach(println(_))
        })
        
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

      

  • 相关阅读:
    SSM框架——详细整合教程(Spring+SpringMVC+MyBatis)
    简单的使用SLF4J门面和log4j
    详细的Log4j使用教程
    java的Mybatis动态代理方式(二)
    java的一个基础的Mybatis例子(一)
    java的注解学习
    ArrayList输出的几种方法
    自己写的一个java链接数据库的类
    ipv4的TCP的几个状态 (SYN, FIN, ACK, PSH, RST, URG)
    tomcat的CATALINA_HOME环境变量可以不用设置
  • 原文地址:https://www.cnblogs.com/shaozhiqi/p/12172113.html
Copyright © 2011-2022 走看看