zoukankan      html  css  js  c++  java
  • Kafka 快速起步

    Kafka 快速起步

    2017-01-05 杜亦舒 性能与架构 性能与架构

    主要内容:
    1. kafka 安装、启动
    2. 消息的 生产、消费
    3. 配置启动集群
    4. 集群下的容错测试
    5. 从文件中导入数据,并导出到文件

    单机示例

    安装

    tar -xzf kafka_2.10-0.10.1.1.tgz
    cd kafka_2.10-0.10.1.1

    启动

    > bin/zookeeper-server-start.sh 
    config/zookeeper.properties
    > bin/kafka-server-start.sh
    config/server.properties

    创建topic

    打开一个新的终端窗口

    bin/kafka-topics.sh --create 
    --zookeeper localhost:2181
    --replication-factor 1
    --partitions 1
    --topic test

    发送消息

    打开一个新的终端窗口

    bin/kafka-console-producer.sh 
    --broker-list localhost:9092
    --topic test

    进入输入模式,随意输入信息,例如:

    hello world
    hi

    获取消息

    打开一个新的终端窗口

    bin/kafka-console-consumer.sh 
    --bootstrap-server localhost:9092
    --topic test
    --from-beginning

    便会显示出刚才发送的两条消息:

    hello world
    hi

    这时可以打开发送消息的终端窗口,输入新的信息,再返回来就可以看到自动接收到了新消息

    配置集群

    新建两个启动配置文件

    > cp config/server.properties 
    config/server-1.properties
    > cp config/server.properties config/server-2.properties

    修改 config/server-1.properties 的以下几项配置:

    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=logs/kafka-logs-1

    修改 config/server-2.properties 的以下几项配置: 

    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=logs/kafka-logs-2

    启动

    > bin/kafka-server-start.sh 
    config/server-1.properties &
    > bin/kafka-server-start.sh config/server-2.properties &

    创建一个topic,设置3个复制

    bin/kafka-topics.sh --create 
    --zookeeper localhost:2181
    --replication-factor 3
    --partitions 1
    --topic my-replicated-topic

    发送消息

    bin/kafka-console-producer.sh 
    --broker-list localhost:9092
    --topic my-replicated-topic

    输入消息:

    my test message 1
    my test message 2

    获取消息

    bin/kafka-console-consumer.sh 
    --bootstrap-server localhost:9092
    --from-beginning
    --topic my-replicated-topic

    可以正常取得消息

    容错测试

    # 取得server1的进程号
    ps aux | grep server-1.properties
    # 杀掉进程
    kill -9 43116

    读取消息

    bin/kafka-console-consumer.sh 
    --bootstrap-server localhost:9092
    --from-beginning
    --topic my-replicated-topic

    返回信息:

    my test message 1
    my test message 2

    仍然可以正常取得消息

    Kafka Connect

    Kafka 中的 connecter 可以与外部系统进行连接,例如文件系统、数据库

    下面实验一个简单文件系统交互,从一个文件中导入数据,然后导出到另一个文件中

    创建一个测试文件,用于导入数据使用

    echo -e "foo
    bar" > test.txt

    启动 connect,执行数据的导入导出

    bin/connect-standalone.sh 
    config/connect-standalone.properties 
    config/connect-file-source.properties 
    config/connect-file-sink.properties

    命令执行后,会输出一系列的日志信息,等待执行完毕

    查看导出结果

    cat test.sink.txt

    返回结果:

    foo
    bar 

    成功导出了 test.txt 中的数据 

    过程分析

    执行第2步的命令后,为什么是去读test.txt?为什么写入了test.sink.txt?中间的过程是什么样的?

    原因是在于两个配置文件

    config/connect-file-source.properties (导入配置)

    name=local-file-source
    connector.class=FileStreamSource
    tasks.max=1
    file=test.txt
    topic=connect-test  

    file指定了是从test.txt中导入数据

    topic指定了把数据发送到connect-test这个topic

    connect-file-sink.properties(导出配置)

    name=local-file-sink
    connector.class=FileStreamSink
    tasks.max=1
    file=test.sink.txt
    topics=connect-test

    file指定了把数据导出到test.txt中导入数据

    topic指定从connect-test这个topic中读取数据

    查看一下connect-test这个topic

    bin/kafka-console-consumer.sh 
    --bootstrap-server localhost:9092
    --topic connect-test
    --from-beginning

    结果为:

    {"schema":{"type":"string","optional":false},"payload":"foo"}
    {"schema":{"type":"string","optional":false},"payload":"bar"}

    现在向test.txt中添加一条新数据:

    echo "Another line" >> test.txt

    再次执行 cat test.sink.txt 就会看到刚刚添加的数据:

    foo
    bar
    Another line        


    更多介绍: http://www.cnblogs.com/ximengchj/p/6423704.html

    相关文章:

    分布式消息队列 Kafka

    Kafka 消息存储及检索

    Kafka 高可用设计

    Kafka是如何实现高吞吐率的



    点击 “阅读原文” 查看 文章列表

  • 相关阅读:
    2020系统综合实践 第1次实践作业
    软工实践个人总结
    2019 SDN大作业
    HDU 4965 Fast Matrix Calculation (矩阵快速幂取模----矩阵相乘满足结合律)
    HDU 1565 (最大流+黑白染色化二分图求最小割)
    HDU 4289 Control (最大流+拆点)
    HDU 3605 Escape(最大流+缩点转换)
    HDOJ4886(hash+枚举)
    POJ 2446 Chessboard (二分匹配)
    POJ 1469 COURSES (二分匹配,邻接表)
  • 原文地址:https://www.cnblogs.com/xmanblue/p/6498041.html
Copyright © 2011-2022 走看看