zoukankan      html  css  js  c++  java
  • kafka复习(1)

      一:flume复习

    0.JMS(java message service )java消息服务

    --------------------------------------------------------------

      queue(队列模式):点对点服务只能有一个消费者。也叫做点对点模式

      publish-subscribe(发布-订阅模式,也叫做主题模式):

    1.flume是收集,聚合,移动日志的框架

    2.agent:  

      source:  //接受数据的,生产者

           //netcat

             //ExecSource实时收集:tail -F xxx.txt

           //spooldir监控文件夹

           //seq

           //Stress  压力测试

            //avroSource

      channel  //暂存数据,相当于缓冲区

            //非永久性的:MemoryChannel

            //永久性:FileChannel磁盘

            //SpillableMemoryChannel:是内存通道和文件通道的一个重组

      sink    //输出数据,消费者,从通道中提取数据

            //HdfsSink  //

            //HBaseSink //

            //HiveSink  //   

            //avroSink  

    kafka

    -------------------------------------------------------

    一、kafka简介

      1.JMS:java message service :java消息服务

      2.kafka:是分布式流处理平台,在系统之间构建实时数据流管道

      3.kafka以集群的形式运行有一个或者多个主机,kafka以主题来分类存储记录,每个记录都有key ,value和timestamp

      4.Producer:生产者;Consumer:消费者;consumer group消费者组;kafka server 包括broker,kafka服务器

       topic:消息以topic为类别记录,每一类的消息称为一个主题

       broker:以集群的方式运行,可以由一个或者多个服务组成,每个服务叫做一个broker,消费者可以订阅一个或者多个主题,并从broker拉数据,从而消费这些已经发布的消息

       每个消息是由:key+value+timestamp组成

      5.kafka:每秒钟百万数据吞吐量

    二、安装kafka

      0.选择s202 ~ s204三台主机安装kafka
      1.准备zk
        略
      2.jdk
        略
      3.tar文件
      4.环境变量
        略
      5.配置kafka
        [kafka/config/server.properties]
        ...
        broker.id=202
        ...
        listeners=PLAINTEXT://:9092
        ...
        log.dirs=/home/centos/kafka/logs
        ...
        zookeeper.connect=s201:2181,s202:2181,s203:2181

        6.分发server.properties,同时修改每个文件的broker.id
        7.启动kafka服务器
          a)先启动zk  
          b)启动kafka
          [s202 ~ s204]
        $>bin/kafka-server-start.sh config/server.properties

          c)验证kafka服务器是否启动
          $>netstat -anop | grep 9092

        8.创建主题

          $>bin/kafka-topics.sh --create --zookeeper s202:2181 --replication-factor 3 --partitions 3 --topic test  //创建主题,分区数为3

        9.查看主题列表
          $>bin/kafka-topics.sh --list --zookeeper s202:2181

        10.启动控制台生产者
          $>bin/kafka-console-producer.sh --broker-list s202:9092 --topic test

        11.启动控制台消费者
          $>bin/kafka-console-consumer.sh --bootstrap-server s202:9092 --topic test --from-beginning --zookeeper s202:2181  //从头开始消费数据

        12.在生产者控制台输入hello world

    三、

      1.副本:broker存放消息以消息到达的顺序进行存放,生产和消费都是副本感知的,支持n-1个故障,每个分区都有leader

        新leader选举的过程是通过isr进行的,第一个注册的follower称为新的leader

       2.kafka支持的副本模式:

        [同步复制]:

        1.producer联系zk识别leader

        2.向leader发送消息

        3.leader收入消息,写入本地log

        4.follower从leader pull消息

        5.follower向本地写入log

        6.follower向leader发送ack确认消息

        7.leader收到所有的ack消息

        8.leader向producer回传ack消息

        

        [异步复制]:

      合同步复制的区别在与leader写入本地log后,直接向client回传ack消息,不需要等待所有的follower复制完成,但是这种模式不能保证消息被生产者分发

     API方式进行访问

    ------------------------------------------

      1.消息生产者

    package com.it18zhang.kafkaDemo.test;
    
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    
    import kafka.producer.ProducerConfig;
    import org.junit.Test;
    
    import java.util.HashMap;
    import java.util.Properties;
    
    /**
     * Created by stone on 2018/8/17.
     */
    public class TestProducer {
        @Test
        public void testSend(){
            Properties props = new Properties();
            props.put("metadata.broker.list","s202:9092");
            props.put("serializer.class","kafka.serializer.StringEncoder");
            props.put("request.required.acks","1");
            //配置生产值配置对象
            ProducerConfig config = new ProducerConfig(props);
            //创建生产者
            Producer<String,String> producer = new Producer<String,String>(config);
            KeyedMessage<String,String> msg =new KeyedMessage<String, String>("test2","100","hello world jack");
            producer.send(msg);
            System.out.println("send over");
        }
    }

    2.消息消费者

    @Test
        public void testConsumer(){
            Properties prop = new Properties();
            prop.put("zookeeper.connect","s202:2181");
            prop.put("group.id","g1");
            prop.put("zookeeper.session.timeout.ms","500");
            prop.put("zookeeper.sync.time.ms","1000");
            //创建消费者配置
            Map<String,Integer> map = new HashMap<String, Integer>();
            map.put("test2",new Integer(1));
            Map<String, List<KafkaStream<byte[], byte[]>>>  msgs= Consumer.createJavaConsumerConnector(new ConsumerConfig(prop) ).createMessageStreams(map);
            List<KafkaStream<byte[], byte[]>> msgList = msgs.get("test2");
            for(KafkaStream<byte[],byte[]> stream : msgList){
                ConsumerIterator<byte[],byte[]> it = stream.iterator();
                while(it.hasNext()){
                    byte[] message = it.next().message();
                    System.out.println(new String(message));
                }
            }
    
        }

    flume与kafka集成的方式

    ----------------------------------------

      1.flume数据sink到kafka

        flume充当生产者

      a1.sources=r1

      a1.sinks=k1

      a1.channels=c1

      a1.sources.r1.type=netcat

      a1.sources.r1.port=8888

      a1.sources.r1.bind=localhsot

    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = test2
    a1.sinks.k1.kafka.bootstrap.servers = s202:9092
    a1.sinks.k1.kafka.flumeBatchSize = 20
    a1.sinks.k1.kafka.producer.acks = 1
    a1.sinks.k1.channel = c1
    a1.channels.c1.type=memory
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1

     

     

      2.kafka充当source

        flume充当消费者

        

    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r1.batchSize = 5000
    a1.sources.r1.batchDurationMillis = 2000
    a1.sources.r1.kafka.bootstrap.servers = s202:9092
    a1.sources.r1.kafka.topics = test3
    a1.sources.r1.kafka.consumer.group.id = g4

    a1.sinks.k1.type = logger

    a1.channels.c1.type=memory

    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

      3.channels通道临时数据存放地缓冲区 ,flume通道有内存通道,文件通道,同时数据也可以存放进入kafka中去,把消息放在kafka里面,flume充当生产者

      

    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    a1.sources.r1.type = avro
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 8888

    a1.sinks.k1.type = logger

    a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    a1.channels.c1.kafka.bootstrap.servers = s202:9092
    a1.channels.c1.kafka.topic = test3
    a1.channels.c1.kafka.consumer.group.id = g6

    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

  • 相关阅读:
    组件化的使用
    MacOS 升级后pod 出现的问题
    协议(Protocol) 和代理(Delegate)
    分类(Category)的本质 及其与类扩展(Extension) /继承(Inherit)的区别
    KVC
    KVO的使用及底层实现
    OC对象的本质及分类
    大端小端
    为什么一个指针在32位系统中占4个字节,在64位系统中占8个字节?
    quarts之Cron表达式示例
  • 原文地址:https://www.cnblogs.com/bigdata-stone/p/9495398.html
Copyright © 2011-2022 走看看