zoukankan      html  css  js  c++  java
  • spark streaming集成kafka

    Kakfa起初是由LinkedIn公司开发的一个分布式的消息系统,后成为Apache的一部分,它使用Scala编写,以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如Cloudera、Apache Storm、Spark等都支持与Kafka集成。

    Spark streaming集成kafka是企业应用中最为常见的一种场景。

    一、安装kafka

    参考文档:

    http://kafka.apache.org/quickstart#quickstart_createtopic

    1、安装java

    2、安装zookeeper集群

    参考:http://www.cnblogs.com/wcwen1990/p/6652105.html

    3、安装scala

    4、安装kafka

    下载kafka安装文件:

    https://archive.apache.org/dist/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz

    解压kafka安装包:

    # tar -zxvf kafka_2.10-0.8.2.1.tgz -C /opt/cdh-5.3.6/

    # chown -R hadoop:hadoop /opt/cdh-5.3.6/kafka_2.10-0.8.2.1/

    删除kafka libs/zookeeper jar包,拷贝自己安装集群zookeeper jar包到kafka libs目录下:

    $ rm libs/zookeeper-3.4.6.jar –rf

    $ cp /opt/cdh-5.3.6/zookeeper-3.4.5-cdh5.3.6/zookeeper-3.4.5-cdh5.3.6.jar libs/

    5、定义kafka配置文件

    5.1)定义server.properties:

    host.name=chavin.king

    log.dirs=/opt/cdh-5.3.6/kafka_2.10-0.8.2.1/kafka-logs

    zookeeper.connect=chavin.king:2181

    定义producer.properties:

    metadata.broker.list=chavin.king:9092

    定义consumer.properties:

    zookeeper.connect=chavin.king:2181

    5.2)启动kafka server

    $ bin/kafka-server-start.sh config/server.properties

    $ jps

    14020 NameNode

    57749 Jps

    14776 QuorumPeerMain

    57690 Kafka

    14507 NodeManager

    14235 ResourceManager

    14093 DataNode

    14686 JobHistoryServer

    57663 ZooKeeperMain

    [zk: localhost:2181(CONNECTED) 3] ls /

    [controller, controller_epoch, brokers, zookeeper, admin, consumers, config, hbase]

    5.3)创建一个topic

    $ bin/kafka-topics.sh --create --zookeeper chavin.king:2181 --replication-factor 1 --partitions 1 --topic test

    $ bin/kafka-topics.sh --list --zookeeper chavin.king:2181

    5.4)创建一个生产者,产生数据

    $ bin/kafka-console-producer.sh --broker-list chavin.king:9092 --topic test

    5.5)创建一个消费者,消费数据

    $ bin/kafka-console-consumer.sh --zookeeper chavin.king:2181 --topic test --from-beginning

    在生产者shell窗口输入数据,在消费者窗口可以看到数据输出到界面上。

    二、spark streaming与kafka集成

    参考文档:http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html

    一)准备工作

    1、编译spark,获得集成kafka jar包:

    参考文档:http://www.cnblogs.com/wcwen1990/p/7688027.html

    说明:spark streaming集成flume或者kafka需要一些支持jar包,这些jar包在编译spark过程中会自动在external目录下生成相应的jar文件,因此,这里需要编译spark来获得这些jar包。

    Spark streaming集成kafka主要需要:spark-streaming-kafka_2.10-1.3.0.jar包。

    2、集成相关jar包

    $ cp external/kafka/target/spark-streaming-kafka_2.10-1.3.0.jar /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/

    $ cp libs/kafka_2.10-0.8.2.1.jar libs/kafka-clients-0.8.2.1.jar libs/zkclient-0.3.jar libs/metrics-core-2.2.0.jar /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/

    [externalLibs]$ ls

    kafka_2.10-0.8.2.1.jar

    kafka-clients-0.8.2.1.jar

    metrics-core-2.2.0.jar

    spark-streaming-kafka_2.10-1.3.0.jar

    zkclient-0.3.jar

    二)集成方式1:Receiver-based Approach

    1、编写spark streaming集成kafka的wordcount

    import java.util.HashMap

    import org.apache.spark._

    import org.apache.spark.streaming._

    import org.apache.spark.streaming.StreamingContext._

    import org.apache.spark.streaming.kafka._

    val ssc = new StreamingContext(sc, Seconds(5))

    val topicMap = Map("test" -> 1)

    // read data

    val lines = KafkaUtils.createStream(ssc, "chavin.king:2181", "testWordCountGroup", topicMap).map(_._2)

    val words = lines.flatMap(_.split(" "))

    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

    wordCounts.print()

    ssc.start() // Start the computation

    ssc.awaitTermination() // Wait for the computation to terminate

    2、spark-shell local模式启动,并运行步骤1程序

    bin/spark-shell --master local[2] --jars

    /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/spark-streaming-kafka_2.10-1.3.0.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/kafka_2.10-0.8.2.1.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/kafka-clients-0.8.2.1.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/zkclient-0.3.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/metrics-core-2.2.0.jar

    scala> import java.util.HashMap

    import java.util.HashMap

    scala> import org.apache.spark._

    import org.apache.spark._

    scala> import org.apache.spark.streaming._

    import org.apache.spark.streaming._

    scala> import org.apache.spark.streaming.StreamingContext._

    import org.apache.spark.streaming.StreamingContext._

    scala> import org.apache.spark.streaming.kafka._

    import org.apache.spark.streaming.kafka._

    scala> val ssc = new StreamingContext(sc, Seconds(5))

    ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@1a28f9a0

    scala> val topicMap = Map("test" -> 1)

    topicMap: scala.collection.immutable.Map[String,Int] = Map(test -> 1)

    scala> val lines = KafkaUtils.createStream(ssc, "chavin.king:2181", "testWordCountGroup", topicMap).map(_._2)

    lines: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@27267641

    scala>

    scala> val words = lines.flatMap(_.split(" "))

    words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@169b0639

    scala> val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

    wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@14f2b1ba

    scala> wordCounts.print()

    scala> ssc.start()

    scala>ssc.awaitTermination()

    3、测试

    在kafka生产者shell端输入:

    hadoop oracle mysql mysql mysql

    这是我们在kafka消费者端可以看到如下输出:

    hadoop oracle mysql mysql mysql

    同时在spark streaming端也可以看到如下输出:

    -------------------------------------------

    Time: 1500021590000 ms

    -------------------------------------------

    (mysql,3)

    (oracle,1)

    (hadoop,1)

    三)集成方式2:Direct Approach (No Receivers)

    1、编写spark streaming集成kafka的wordcount

    import kafka.serializer.StringDecoder

    import org.apache.spark._

    import org.apache.spark.streaming._

    import org.apache.spark.streaming.StreamingContext._

    import org.apache.spark.streaming.kafka._

    val ssc = new StreamingContext(sc, Seconds(5))

    val kafkaParams = Map[String, String]("metadata.broker.list" -> "chavin.king:9092")

    val topicsSet = Set("test")

    // read data

    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

    val lines = messages.map(_._2)

    val words = lines.flatMap(_.split(" "))

    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

    wordCounts.print()

    ssc.start() // Start the computation

    ssc.awaitTermination() // Wait for the computation to terminate

    2、spark-shell local模式启动,并运行步骤1程序

    bin/spark-shell --master local[2] --jars

    /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/spark-streaming-kafka_2.10-1.3.0.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/kafka_2.10-0.8.2.1.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/kafka-clients-0.8.2.1.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/zkclient-0.3.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/metrics-core-2.2.0.jar

    scala> import kafka.serializer.StringDecoder

    import kafka.serializer.StringDecoder

    scala> import org.apache.spark._

    import org.apache.spark._

    scala> import org.apache.spark.streaming._

    import org.apache.spark.streaming._

    scala> import org.apache.spark.streaming.StreamingContext._

    import org.apache.spark.streaming.StreamingContext._

    scala> import org.apache.spark.streaming.kafka._

    import org.apache.spark.streaming.kafka._

    scala>

    scala> val ssc = new StreamingContext(sc, Seconds(5))

    ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@2d05daca

    scala>

    scala> val kafkaParams = Map[String, String]("metadata.broker.list" -> "chavin.king:9092")

    kafkaParams: scala.collection.immutable.Map[String,String] = Map(metadata.broker.list -> chavin.king:9092)

    scala> val topicsSet = Set("test")

    topicsSet: scala.collection.immutable.Set[String] = Set(test)

    scala>

    scala> // read data

    scala> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

    17/07/14 16:59:31 INFO VerifiableProperties: Verifying properties

    17/07/14 16:59:31 INFO VerifiableProperties: Property group.id is overridden to

    17/07/14 16:59:31 INFO VerifiableProperties: Property zookeeper.connect is overridden to

    messages: org.apache.spark.streaming.dstream.InputDStream[(String, String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@375c2870

    scala>

    scala> val lines = messages.map(_._2)

    lines: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@1dda179e

    scala> val words = lines.flatMap(_.split(" "))

    words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@996294c

    scala> val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

    wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@19cd9e6a

    scala> wordCounts.print()

    scala> ssc.start()

    scala>ssc.awaitTermination()

    3、测试

    在kafka生产者shell端输入:

    hadoop oracle mysql mysql mysql

    这是我们在kafka消费者端可以看到如下输出:

    hadoop oracle mysql mysql mysql

    同时在spark streaming端也可以看到如下输出:

    -------------------------------------------

    Time: 1500021590000 ms

    -------------------------------------------

    (mysql,3)

    (oracle,1)

    (hadoop,1)

    至此,spark streaming集成kafka两种方式演示OK。但是通过上述案例我们可以发现,目前的spark streaming仅仅对每次的输入值进行一次计算,而企业应用中,可能更需要将多次的输入值进行累加,那么该怎么实现呢?看下面案例?

    四)使用UpdataStateByKey实现spark streaming多次输入值的累加操作

    1、创建文件udsb.scala文件,输入如下内容:

    $ cat udsb.scala

    import kafka.serializer.StringDecoder

    import org.apache.spark._

    import org.apache.spark.streaming._

    import org.apache.spark.streaming.StreamingContext._

    import org.apache.spark.streaming.kafka._

    val ssc = new StreamingContext(sc, Seconds(5))

    ssc.checkpoint(".")

    val kafkaParams = Map[String, String]("metadata.broker.list" -> "chavin.king:9092")

    val topicsSet = Set("test")

    val updateFunc = (values: Seq[Int], state: Option[Int]) => {

    val currentCount = values.sum

    val previousCount = state.getOrElse(0)

    Some(currentCount + previousCount)

    }

    // read data

    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

    val lines = messages.map(_._2)

    val words = lines.flatMap(_.split(" "))

    val wordDstream = words.map(x => (x, 1))

    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)

    stateDstream.print()

    ssc.start()

    ssc.awaitTermination()

    2、spark-shell local模式启动,并运行步骤1程序

    bin/spark-shell --master local[2] --jars

    /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/spark-streaming-kafka_2.10-1.3.0.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/kafka_2.10-0.8.2.1.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/kafka-clients-0.8.2.1.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/zkclient-0.3.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/metrics-core-2.2.0.jar

    scala> :load /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/udsb.scala

    3、测试

    在kafka生产者shell端输入:

    3.1)第一次输入:hadoop oracle mysql

    Spark streaming端可以看到如下输出:

    -------------------------------------------

    Time: 1500023985000 ms

    -------------------------------------------

    (mysql,1)

    (oracle,1)

    (hadoop,1)

    3.2)第二次输入:hadoop oracle mysql

    Spark streaming端可以看到如下输出:

    -------------------------------------------

    Time: 1500023985000 ms

    -------------------------------------------

    (mysql,2)

    (oracle,2)

    (hadoop,2)

    3.3)第三次输入:hadoop oracle mysql

    Spark streaming端可以看到如下输出:

    -------------------------------------------

    Time: 1500023985000 ms

    -------------------------------------------

    (mysql,3)

    (oracle,3)

    (hadoop,3)

  • 相关阅读:
    信息安全系统设计基础第五周学习总结
    信息安全系统设计基础第四周学习总结
    信息安全系统设计基础第三周学习总结
    信息安全系统设计基础第二周学习总结
    信息安全系统设计基础第一周学习总结
    20135239益西拉姆第四次实验报告
    第三次实验报告 敏捷开发与XP实践
    20135239益西拉姆第二次实验报告
    20135239 第一次实验报告
    微信公众号方法(不定时更新)
  • 原文地址:https://www.cnblogs.com/wcwen1990/p/7899184.html
Copyright © 2011-2022 走看看