zoukankan      html  css  js  c++  java
  • Flume+Kafka+SparkStreaming+Hbase+可视化(二)

    分布式消息缓存Kafka
     
    1、消息中间件:生产者和消费者 生产者、消费者、数据流(消息)
    • 发布和订阅消息
    • 容错存储消息记录
    • 处理流数据
    Kafka架构:
    procedure:生产者
    consumer:消费者
    broker:容错存储
    topic:分类主题、标签
    consumer group:一个consumer最多消费一个分区的数据 consumer数量=partitions
    磁盘顺序读写,省掉寻道时间,提高性能
    零字节拷贝:内核空间和用户空间不直接拷贝、SendFile
    /opt/bigdata/kafka_2.11-1.0.0/kafka-log2s/logkafka-0/00000000000000000000.index index的序号就是message在日志文件中的相对offset(偏移量)
    offsetIndex是稀疏索引,先根据offset找到对应log文件,计算 offset - (log文件第一个offset -1) 得到相对索引,再到index文件找到消息。如果index找不到,则取最近的,再去log文件对应位置向下查找
    ack: 0 :不等待broker返回确认消息,无阻塞
    1 :partitions 中的leader 保存成功
    -1: partitions 中的leader和follower都成功
     
    启动ZK:
    启动Kafka:kafkaStart.sh
    nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server0.properties &
    nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server1.properties &
    nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server2.properties &
     
    创建Topic:
    kafka-topics.sh --create --zookeeper bigdata:2181,bigdata:2182,bigdata:2183 --replication-factor 1 --partitions 1 --topic logkafka
    --partitions 可以提高消费并发
     
    查看Topic:
    kafka-topics.sh --list --zookeeper bigdata:2181
    kafka-topics.sh --describe --zookeeper bigdata:2181 --topic test (指定Topic,否则查看所有topic的详细信息)
     
    发送消息:
    kafka-console-producer.sh --broker-list localhost:9092 --topic logkafka
     
    接受消息:
    kafka-console-consumer.sh --zookeeper bigdata:2181 --topic logkafka --from-beginning (--from-beginning . 是否从头开始消费消息)
     
    停止Kafka:kafkaStop.sh
    $KAFKA_HOME/bin/kafka-server-stop.sh $KAFKA_HOME/config/server0.properties &
    $KAFKA_HOME/bin/kafka-server-stop.sh $KAFKA_HOME/config/server1.properties &
    $KAFKA_HOME/bin/kafka-server-stop.sh $KAFKA_HOME/config/server2.properties &
     
    两种方式连接Kafka:简单理解为:Receiver方式是通过zookeeper来连接kafka队列,Direct方式是直接连接到kafka的节点上获取数据
    Receiver:
    1、Kafka中topic的partition与Spark中RDD的partition是没有关系的,因此,在KafkaUtils.createStream()中,提高partition的数量,只会增加Receiver的数量,也就是读取Kafka中topic partition的线程数量,不会增加Spark处理数据的并行度。
    2、可以创建多个Kafka输入DStream,使用不同的consumer group和topic,来通过多个receiver并行接收数据。
    3、如果基于容错的文件系统,比如HDFS,启用了预写日志机制,接收到的数据都会被复制一份到预写日志中。因此,在KafkaUtils.createStream()中,设置的持久化级别是StorageLevel.MEMORY_AND_DISK_SER。
    Direct:
    1、简化并行读取:如果要读取多个partition,不需要创建多个输入DStream,然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。
    2、高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。
    3、一次且仅一次的事务机制:基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。由于数据消费偏移量是保存在checkpoint中,因此,如果后续想使用kafka高级API消费数据,需要手动的更新zookeeper中的偏移量
     
    2、API操作
    <dependency>  
           <groupId>org.apache.kafka</groupId>  
            <artifactId>kafka_2.11</artifactId>  
            <version>1.0.0</version>     
    </dependency>  
     
    Scala版 Producer :
    package com.kafka
    import java.util.HashMap
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
    object producer {
      def main(args: Array[String]): Unit = {
      // 传参
      if (args.length < 4){
      System.err.println("Usage: producer <metadataBrokerList> <topics> <messageSec> <wordsPerMessage>")
      System.exit(1)
      }
      val Array(brokers, topics, messageSec, wordsPerMessage) = args
      // ZK 配置
      val zkProps = new HashMap[String, Object]()
      zkProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
      zkProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
      zkProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
      // Kafka Producer
      val producer = new KafkaProducer[String, String](zkProps)
      var i = 0
      for ( i <- 1 to 10) {
        (1 to messageSec.toInt).foreach { messageNum =>
          val msg = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ")
          val msgs = new ProducerRecord[String, String](topics, null, msg)
          producer.send(msgs)
        }
        Thread.sleep(100)
       }
      }
    }
    

      

    3、整合Flume:
    conf1:exec-mem-avro.conf
    # Name the components on this agent
    a1.sources = exec-source
    a1.channels = memory-channel
    a1.sinks = avro-sink
     
    # configure for sources
    a1.sources.exec-source.type = exec
    a1.sources.exec-source.command = tail -F /opt/datas/log-collect-system/log_server.log
     
    # configure for channels
    a1.channels.memory-channel.type = memory
    a1.channels.memory-channel.capacity = 1000
    a1.channels.memory-channel.transactionCapacity = 100
     
    # configure for sinks
    a1.sinks.avro-sink.type = avro
    a1.sinks.avro-sink.hostname = localhost
    a1.sinks.avro-sink.port = 44444
     
    # configure
    a1.sinks.avro-sink.channel = memory-channel
    a1.sources.exec-source.channels = memory-channel
    
    Kafka conf:exec-memory-kafka.cnf
    # Name the components on this agent
    a1.sources = avro-source
    a1.channels = memory-channel
    a1.sinks = logger-sink
     
    # configure for sources
    a1.sources.avro-source.type = avro
    a1.sources.avro-source.bind = localhost
    a1.sources.avro-source.port = 44444
     
    # configure for channels
    a1.channels.memory-channel.type = memory
    a1.channels.memory-channel.capacity = 1000
    a1.channels.memory-channel.transactionCapacity = 100
     
    # configure for sinks
    a1.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
    #a1.sinks.kafka-sink.bootstrap.servers = bigdata:9092,bigdata:9093,bigdata:9094
    a1.sinks.kafka-sink.brokerList = bigdata:9092,bigdata:9093,bigdata:9094
    a1.sinks.kafka-sink.topic = logkafka
     
    # configure
    a1.sinks.kafka-sink.channel = memory-channel
    a1.sources.avro-source.channels = memory-channel
    

      

  • 相关阅读:
    【2018.05.05 C与C++基础】C++中的自动废料收集:概念与问题引入
    【2018.04.27 C与C++基础】关于switch-case及if-else的效率问题
    【2018.04.19 ROS机器人操作系统】机器人控制:运动规划、路径规划及轨迹规划简介之一
    March 11th, 2018 Week 11th Sunday
    March 10th, 2018 Week 10th Saturday
    March 09th, 2018 Week 10th Friday
    March 08th, 2018 Week 10th Thursday
    March 07th, 2018 Week 10th Wednesday
    ubantu之Git使用
    AMS分析 -- 启动过程
  • 原文地址:https://www.cnblogs.com/mlxx9527/p/9367543.html
Copyright © 2011-2022 走看看