zoukankan      html  css  js  c++  java
  • KafKa

    一、Kafaka 介绍

    Apache Kafka 是分布式发布-订阅消息系统。 它最初由 LinkedIn 公司开发, 之后成为 Apache 项目的一部分。 Kafka 是一种快速、 可扩展的、 设计内在就是 分布式的, 分区的和可复制的提交日志服务 Kafka 是一个消息系统, 原本开发自 LinkedIn, 用作 LinkedIn 的活动流
    ( activity stream) 和运营数据处理管道( pipeline) 的基础。 后贡献给 apache 基金会, 成为 apache 的一个顶级项目。 

    补充:

    1、Kafka是一个分布式发布-订阅消息系统。分布式,意味着Kafka
    可以在集群中运行。
    2、消息的发布者,称为生产者(Producers),负责发布消息。发布消息之前,
    需要发布确定一个主题(topic)。
    3、broker,消息的缓存代理,相当于消息的服务器。生产者发布
    的消息缓存在broker中。
    4、消息的订阅者,称为消费者(Consumers),负责订阅消费消息。它向broker
    中获取订阅的消息进行消费。
    5、Kafka集群的运行需要Zookeeper来协调服务,所以在运行Kafka
    集群之前,必须保证Zookeeper集群能正常运行。
    6、在企业的大数据开发中,
    经常用Flume+Kafka+Spark Streaming(Storm)或者
    Kafka+Spark Streaming+Redis的方式进行开发。

    NoSQL:not only SQL(不仅仅是SQL),
    常用NoSQL数据库:HBase、ES(Elastic Search)、Redis、MongDB

     

    二、安装Kafka步骤

            1、安转zookeeper

            2、安装kafaka

            3、下载解压安装包kafka_2.10-0.9.0.1.tgz

            4、vi config/server.properties

            5、broker.id=0
                host.name=liuiwei3
               zookeeper.connect=liuwei3:2181,liuwei1:2181,liuwei4:2181

            6、将kafka复制到其他节点 scp -r  kafka_2.10-0.9.0.1 hadoop@liuwei2:$PWD                          scp -r  kafka_2.10-0.9.0.1 hadoop@liuwei4:$PWD 

            7、在其他两个salve节点分别修改 

                  broker.id=1
                host.name=liuiwei2
               zookeeper.connect=liuwei3:2181,liuwei1:2181,liuwei4:2181

             8、启动kafka

                 在三台机器上分别启动              ./bin/kafka-server-start.sh -daemon config/server.properties

             9、检验kafka是否安装成功

                 在master节点(liuwei3)操作:创建一个名为 test 的主题

                    bin/kafka-topics.sh --create --zookeeper liuwei3:2181,liuwei2:2181,liuwei4:2181 --replication-factor 1 --partitions 1 --topic test

                   在一个终端上启动一个生产者  ./bin/kafka-console-producer.sh --broker-list liuwei3:9092,liuwei2:9092,liuwei4:9092 --topic test

                  然后再键盘输入信息:hello   

                  在另一个终端中启动消费者(liuwei2,liuwei4),进入交互客户端,命令如下: ./bin/kafka-console-consumer.sh --zookeeper liuwei2:2181 --topic test --from-beginning

                 会在屏幕上显示相同的信息 hello

    三、Kafka+Spark Streaming

    object KafkaDemo {
      def main(args: Array[String]): Unit = {
        val conf=new SparkConf().setAppName("KafkaDemo")
                  .setMaster("local[2]")
        val ssc=new StreamingContext(conf,Seconds(5))
        //创建主题集合
        val topicSet=Map(("tgtest2"->1))
    
        /**
          * 第一个参数:StreamingContext对象
          * 第二个参数:Zookeeper集群,注意端口:2181
          * 第三个参数:消费者Consumer所在小组
          * 第四个参数:运行的主题
          */
        //从Kafka中获取数据
        val lines=KafkaUtils.createStream(ssc,
        "tgmaster:2181,tgslave:2181","DefaultConsumerGroup",topicSet)
        val result=lines.flatMap(x=>{
          x._2.split(" ")
        }).map(word=>(word,1))
          .reduceByKey(_+_)
    
        result.print()
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    四、kafka的基本操作   

    1. 启动 Kafka:
    bin/kafka-server-start.sh config/server.properties &
    2. 创建 Topics
    bin/kafka-topics.sh --create --zookeeper zookeeper31:2181
    -replication-factor 1 --partitions 3 --topic mydemo1
    3. 查看 Topcis 队列列表
    bin/kafka-topics.sh --list --zookeeper zookeeper31:2181 --topic mydemo1
    4. 查看队列明细
    bin/kafka-topics.sh --describe --zookeeper zookeeper31:2181 --topic
    mydemo1
    5. 停止 Kafka: bin/kafka-server-stop.sh config/server.properties &
    6. 发送消息
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
    mydemo1
    7. 接收消息
    bin/kafka-console-consumer.sh --zookeeper zookeeper31:2181 --topic
    mydemo1

       

    五、spark streaming 操作kafka

       1、基于 Receiver 的方式

    这种方式使用 Receiver 来获取数据。Receiver 是使用 Kafka 的高层次 Consumer API 来实现的。 receiver 从 Kafka 中获取的数据都是存储在 Spark Executor 的 内存中的, 然后 Spark Streaming 启动的 job 会去处理那些数据。

    然而, 在默认的配置下, 这种方式可能会因为底层的失败而丢失数据。 如果要启 用高可靠机制, 让数据零丢失, 就必须启用 Spark Streaming 的预写日志机制 ( Write Ahead Log, WAL) 。 该机制会同步地将接收到的 Kafka 数据写入分布式 文件系统( 比如 HDFS) 上的预写日志中。 所以, 即使底层节点出现了失败, 也 可以使用预写日志中的数据进行恢复。

    如何进行 Kafka 数据源连接?
    (1) 在 maven 添加依赖
    groupId = org.apache.spark
    artifactId = spark-streaming-kafka_2.10
    version = 1.5.1
    (2) 使用第三方工具类创建输入 DStream
    JavaPairReceiverInputDStream<String, String> kafkaStream =
    KafkaUtils.createStream(streamingContext,
    [ZK quorum], [consumer group id], [per-topic number of Kafka
    partitions to consume]);

    需要注意的要点:
    (1) Kafka 中的 topic 的 partition, 与 Spark 中的 RDD 的 partition 是没有 关系的。 所以, 在 KafkaUtils.createStream()中, 提高 partition 的数量, 只 会增加一个 Receiver 中, 读取 partition 的线程的数量。 不会增加 Spark 处理 数据的并行度。
    (2) 可以创建多个 Kafka 输入 DStream,使用不同的 consumer group 和 topic, 来通过多个 receiver 并行接收数据。
    (3) 如果基于容错的文件系统, 比如 HDFS, 启用了预写日志机制, 接收到的 数据都会被复制一份到预写日志中。 因此, 在 KafkaUtils.createStream() 中, 设置的持久化级别是 StorageLevel.MEMORY_AND_DISK_SER。

    Kafka 命令
    bin/kafka-topics.sh --zookeeper tgmaster:2181,tgslave:2181 --topic  test4 --replication-factor 1 --partitions 1 --create

    bin/kafka-console-producer.sh --broker-list tgmaster:9092,tgslave:9092  --topic test4

      2. 基于 Direct 的方式

    这种新的不基于 Receiver 的直接方式, 是在 Spark 1.3 中引入的, 从而能够确 保更加健壮的机制。 替代掉使用 Receiver 来接收数据后, 这种方式会周期性地 查询 Kafka,来获得每个 topic+partition 的最新的 offset,从而定义每个 batch
    的 offset 的范围。 当处理数据的 job 启动时, 就会使用 Kafka 的简单 consumer api 来获取 Kafka 指定 offset 范围的数据。
        

    这种方式有如下优点:
    (1)简化并行读取: 如果要读取多个 partition, 不需要创建多个输入 DStream 然后对它们进行 union 操作。 Spark 会创建跟 Kafka partition 一样多的 RDD partition, 并且会并行从 Kafka 中读取数据。 所以在 Kafka partition 和 RDD
    partition 之间, 有一个一对一的映射关系。
    (2) 高性能: 如果要保证零数据丢失, 在基于 receiver 的方式中, 需要开启 WAL 机制。 这种方式其实效率低下, 因为数据实际上被复制了两份, Kafka 自己 本身就有高可靠的机制, 会对数据复制一份, 而这里又会复制一份到 WAL 中。 而
    基于 direct 的方式, 不依赖 Receiver, 不需要开启 WAL 机制, 只要 Kafka 中作 了数据的复制, 那么就可以通过 Kafka 的副本进行恢复。
    (3) 一次且仅一次的事务机制:
    基于 receiver 的方式, 是使用 Kafka 的高阶 API 来在 ZooKeeper 中保存消费过的 offset 的。 这是消费 Kafka 数据的传统方式。 这种方式配合着 WAL 机制可以保证数据零丢失的高可靠性, 但是却无法保证数据被处理一次且仅一
    次, 可能会处理两次。 因为 Spark 和 ZooKeeper 之间可能是不同步的。 基于 direct 的方式, 使用 kafka 的简单 api, Spark Streaming 自己就 负责追踪消费的 offset, 并保存在 checkpoint 中。 Spark 自己一定是同步的, 因此可以保证数据是消费一次且仅消费一次。

    JavaPairReceiverInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(streamingContext,
    [key class], [value class], [key decoder class], [value decoder
    class], [map of Kafka parameters], [set of topics to consume]);

    Kafka 命令:
    bin/kafka-topics.sh --zookeeper 192.168.1.107:2181,192.168.1.108:2181  --topic TestTopic --replication-factor 1 --partitions 1 --create

    bin/kafka-console-producer.sh --broker-list
    192.168.1.107:9092,192.168.1.108:9092 --topic TestTopic
    192.168.1.191:2181,192.168.1.192:2181,192.168.1.193:2181
    metadata.broker.list



    补充:  kafka相关网址

    http://www.cnblogs.com/yinchengzhe/p/5111635.html

    http://chengjianxiaoxue.iteye.com/blog/2190488

    http://blog.csdn.net/u014373825/article/details/42711191
    http://www.cnblogs.com/mengyou0304/p/4836555.html

    http://www.tuicool.com/articles/ieUZne     kafka常用命令说明

    http://1028826685.iteye.com/blog/2326601  kafka参数详解

                       

  • 相关阅读:
    Unix命令大全
    vs2008 与 IE8出现的兼容性问题
    Java 创建文件、文件夹以及临时文件
    如何修改Wamp中mysql默认空密码
    PAT 乙级真题 1003.数素数
    Tags support in htmlText flash as3
    DelphiXE4 FireMonkey 试玩记录,开发IOS应用 还是移植
    10 Great iphone App Review sites to Promote your Apps!
    HTML tags in textfield
    Delphi XE4 IOS 开发, "No eligible applications were found“
  • 原文地址:https://www.cnblogs.com/liuwei6/p/6648515.html
Copyright © 2011-2022 走看看