zoukankan      html  css  js  c++  java
  • SparkStreaming 整合kafka Demo

    这里使用的是低级API,因为高级API非常不好用,需要繁琐的配置,也不够自动化,却和低级API的效果一样,所以这里以低级API做演示

    你得有zookeeper和kafka

    我这里是3台节点主机

    架构图

    与高级API的区别,简单并行(不需要创造多个输入流,它会自动并行读取kafka的数据),高效(不会像receiver数据被copy两次),一次性语义(缺点:无法使用zookeeper的监控工具)

    1.创建maven工程

    首先添加pom依赖,其它运行依赖请参考 sparkStreaming整合WordCount

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
        <version>2.0.2</version>
    </dependency>

    2.启动zookeeper集群

    我把zookeeper集群弄成了个脚本,直接执行脚本启动所有zookeeper

    启动成功

    3.启动kafka集群

    我这里是3台主机,三台都需要

    进入目录

    cd /export/servers/kafka/bin/

    启动

    kafka-server-start.sh -daemon /export/servers/kafka/config/server.properties 

    成功

    4.测试kafka

    创建topic

    cd /export/servers/kafka_2.11-0.10.2.1
    bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 1 --topic kafka_spark

    通过生产者发送消息

    cd /export/servers/kafka_2.11-0.10.2.1
    bin/kafka-console-producer.sh --broker-list node01:9092 --topic  kafka_spark

    想发啥,发啥。此时通过创建AP接收生产者发送的数据

    编写代码

    package SparkStreaming
    
    import kafka.serializer.StringDecoder
    import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    object SparkStreamingKafka {
      def main(args: Array[String]): Unit = {
        // 1.创建SparkConf对象
        val conf: SparkConf = new SparkConf()
          .setAppName("SparkStreamingKafka_Direct")
          .setMaster("local[2]")
    
        // 2.创建SparkContext对象
        val sc: SparkContext = new SparkContext(conf)
        sc.setLogLevel("WARN")
    
        // 3.创建StreamingContext对象
        /**
          * 参数说明:
          *   参数一:SparkContext对象
          *   参数二:每个批次的间隔时间
          */
        val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
        //设置checkpoint目录
    
        ssc.checkpoint("./Kafka_Direct")
    
        // 4.通过KafkaUtils.createDirectStream对接kafka(采用是kafka低级api偏移量不受zk管理)
        // 4.1.配置kafka相关参数
        val kafkaParams=Map("metadata.broker.list"->"192.168.52.110:9092,192.168.52.120:9092,192.168.52.130:9092","group.id"->"kafka_Direct")
        // 4.2.定义topic
        val topics=Set("kafka_spark")
    
        val dstream: InputDStream[(String, String)] = KafkaUtils
          .createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
    
        // 5.获取topic中的数据
        val topicData: DStream[String] = dstream.map(_._2)
    
        // 6.切分每一行,每个单词计为1
        val wordAndOne: DStream[(String, Int)] = topicData.flatMap(_.split(" ")).map((_,1))
    
        // 7.相同单词出现的次数累加
        val resultDS: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
    
        // 8.通过Output Operations操作打印数据
        resultDS.print()
    
        // 9.开启流式计算
        ssc.start()
    
        // 阻塞一直运行
        ssc.awaitTermination()
    
    
    
      }
    }

    生产者生产数据

    API接收控制台打印计算结果

  • 相关阅读:
    一起来构建前端工具链吧~(新建项目)
    我的前端故事----高仿支付宝密码输入框
    我的前端故事----疯狂倒计时(requestAnimationFrame)
    Oracle 导入导出SQL 查看登录用户表个数
    Oracle11g使用exp导出空表
    Spring Boot(二)Application events and listeners
    Spring Boot(一)启动方式
    Android BroadcastReceiver
    钱格式化
    Intellij idea 快键键
  • 原文地址:https://www.cnblogs.com/BigDataBugKing/p/11233729.html
Copyright © 2011-2022 走看看