zoukankan      html  css  js  c++  java
  • Kafka学习(一)-------- Quickstart

    参考官网:http://kafka.apache.org/quickstart

    一、下载Kafka

    官网下载地址 http://kafka.apache.org/downloads

    截至2019年7月8日 最新版本为 2.3.0 2.12为编译的scala版本 2.3.0为kafka版本

    二、启动服务

    要先启动zookeeper kafka内置了一个 也可以不用

    > bin/zookeeper-server-start.sh config/zookeeper.properties
    [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
    ...
    
    > bin/kafka-server-start.sh config/server.properties
    [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
    [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
    ...
    

    三、创建topic

    replication-factor为1   partitions为1
    > bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
    查看topic
    > bin/kafka-topics.sh --list --bootstrap-server localhost:9092
    test
    

    也可以不创建topic 设置自动创建 当publish的时候

    四、发送消息

    用command line client 进行测试 一行就是一条消息

    > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    This is a message
    This is another message
    

    五、消费者

    command line consumer 可以接收消息

    > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    This is a message
    This is another message
    

    六、设置多broker集群

    单broker没有意思 我们可以设置三个broker

    首先为每个broker 复制配置文件

    > cp config/server.properties config/server-1.properties
    > cp config/server.properties config/server-2.properties
    

    然后编辑

    config/server-1.properties:
        broker.id=1
        listeners=PLAINTEXT://:9093
        log.dirs=/tmp/kafka-logs-1
     
    config/server-2.properties:
        broker.id=2
        listeners=PLAINTEXT://:9094
        log.dirs=/tmp/kafka-logs-2
    

    broker.id是唯一的 cluster中每一个node的名字 我们在same machine上 所有要设置listeners和log.dirs 以防冲突

    建一个topic 一个partitions 三个replication-factor

    > bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
    
    用describe看看都是什么情况
    > bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
    Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
        Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0
    
    • 有几个概念 :

    • "leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.

    • "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.

    • "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.

    刚才那个topic
    > bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
    Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:
        Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    

    发送 接收

    > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
    ...
    my test message 1
    my test message 2
    ^C
    
    
    > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
    ...
    my test message 1
    my test message 2
    ^C
    

    试一下容错 fault-tolerance

    > ps aux | grep server-1.properties
    7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
    > kill -9 7564
    
    看一下变化:Leader换了一个  因为1被干掉了
    > bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
    Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
        Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0
    
    还是收到了消息
    > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
    ...
    my test message 1
    my test message 2
    ^C
    

    七、使用kafka import/export data

    刚才都是console 的数据,其他的sources other systems呢 用Kafka Connect

    弄一个数据
    > echo -e "foo
    bar" > test.txt
    
    启动  指定配置文件
    > bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
    
    验证一下
    > more test.sink.txt
    foo
    bar
    
    消费者端
    > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
    {"schema":{"type":"string","optional":false},"payload":"foo"}
    {"schema":{"type":"string","optional":false},"payload":"bar"}
    ...
    
    可以继续写入
    > echo Another line>> test.txt
    

    八、使用Kafka Streams

    http://kafka.apache.org/22/documentation/streams/quickstart

    WordCountDemo

    https://github.com/apache/kafka/blob/2.2/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java

    代码片段

    // Serializers/deserializers (serde) for String and Long types
    final Serde<String> stringSerde = Serdes.String();
    final Serde<Long> longSerde = Serdes.Long();
     
    // Construct a `KStream` from the input topic "streams-plaintext-input", where message values
    // represent lines of text (for the sake of this example, we ignore whatever may be stored
    // in the message keys).
    KStream<String, String> textLines = builder.stream("streams-plaintext-input",
        Consumed.with(stringSerde, stringSerde);
     
    KTable<String, Long> wordCounts = textLines
        // Split each text line, by whitespace, into words.
        .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
     
        // Group the text words as message keys
        .groupBy((key, value) -> value)
     
        // Count the occurrences of each word (message key).
        .count()
     
    // Store the running counts as a changelog stream to the output topic.
    wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
    

    建一个 Kafka producer 指定input topic output topic

    > bin/kafka-topics.sh --create 
        --bootstrap-server localhost:9092 
        --replication-factor 1 
        --partitions 1 
        --topic streams-wordcount-output 
        --config cleanup.policy=compact
    Created topic "streams-wordcount-output".
    

    启动WordCount demo application

    bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
    

    启动一个生产者写数据

    > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
    all streams lead to kafka
    hello kafka streams
    

    启动一个消费者接数据

    > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 
        --topic streams-wordcount-output 
        --from-beginning 
        --formatter kafka.tools.DefaultMessageFormatter 
        --property print.key=true 
        --property print.value=true 
        --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 
        --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
        
    all     1
    streams 1
    lead    1
    to      1
    kafka   1
    hello   1
    kafka   2
    streams 2
    kafka   1
    
  • 相关阅读:
    基于项目中遇到的技术问题,谈谈SharedPreferences的使用的注意问题
    mongodb数据库从库同步主库维护js脚本
    MongoDB数据库日志备份压缩脚本
    mongodb数据库磁盘碎片整理。
    mongodb表字段处理生成域名字段
    根据当前进程号,获取进程下线程数目
    mongodb mapreduce示例
    MongoDB数据库库级锁研究分析
    mongodb库表信息监控脚本
    利用JAVA设计一个可视化日历
  • 原文地址:https://www.cnblogs.com/tree1123/p/11150927.html
Copyright © 2011-2022 走看看