zoukankan      html  css  js  c++  java
  • spark-stream简单使用案例

    >spark-stream检测liunx nc模式下的简单操作

     加入的jar包

     <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>    //2.1.1版本
    </dependency>

    package SparkStream import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext}
    object streamDome { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("streamDome") val stream =new StreamingContext(conf,Seconds(5)) val text = stream.socketTextStream("192.168.188.131",6666) text.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print() text.print() stream.start() stream.awaitTermination() //等待终止 } }

     >spark-streaming-kafka的操作

    依赖包:

     <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
                <version>${spark.version}</version> //2.1.1       
     </dependency>

     >Dstream

    代码:

    package SparkStream
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object StreamKafka {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName("kafka")
        val stream = new StreamingContext(conf,Seconds(5))
        stream.checkpoint("D://stkafka")  //采集日志存储信息,容错性
        val zk = "192.168.188.130:2181,192.168.188.131:2181,192.168.188.132:2181"//zekeeper地址
        val topic = Map("test"->3); //test为kafka的主题名称,可以是多个同时采集
        val groupId = "spark_kafka" //名称任意
        val text = KafkaUtils.createStream(stream,zk,groupId,topic)
        text.print()
        stream.start()
        stream.awaitTermination()
        stream.stop()
      }
    }

      >DirectStream

    package SparkStream
    
    import kafka.serializer.StringDecoder
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object StreamKafkaDirectStream {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName("kafkaRedict")
        val stream = new StreamingContext(conf,Seconds(10))
        val topic = Set("test")
        val brokers = "192.168.188.130:9092,192.168.188.131:9092,192.168.188.132:9092"
        val kafkaParams = Map[String,String]("metadata.broker.list" -> brokers,
          "serializer.class" -> "kafka.serializer.StringDecoder")
        val text = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](stream,kafkaParams,topic)
        text.print()
        stream.start()
        stream.awaitTermination()
        stream.stop()
      }
    }
  • 相关阅读:
    【SQL】开窗函数简介
    【博客园美化】参考链接汇总(持续更新中……)
    【SQL】牛客网SQL试题练习(更新到11题)
    【SQL】SQL和MySQL语句的执行顺序
    【数据分析项目】淘宝用户行为分析【SQL+Tableau】
    【数据分析项目】婴儿商品消费情况分析【Excel】
    【Sublime Text 3】前端常用快捷键
    【Excel】常用快捷键
    MongoDB 规范
    mongodb 常用命令
  • 原文地址:https://www.cnblogs.com/han-guang-xue/p/10057276.html
Copyright © 2011-2022 走看看