zoukankan      html  css  js  c++  java
  • Kafka 快速起步(作者:杜亦舒)

    Kafka 快速起步

    主要内容:
    1. kafka 安装、启动
    2. 消息的 生产、消费
    3. 配置启动集群
    4. 集群下的容错测试
    5. 从文件中导入数据,并导出到文件

    单机示例

    安装

    tar -xzf kafka_2.10-0.10.1.1.tgz
    cd kafka_2.10-0.10.1.1

    启动

    > bin/zookeeper-server-start.sh 
    config/zookeeper.properties
    > bin/kafka-server-start.sh 
    config/server.properties

    创建topic

    打开一个新的终端窗口

    bin/kafka-topics.sh --create 
    --zookeeper localhost:2181 
    --replication-factor 1 
    --partitions 1 
    --topic test

    发送消息

    打开一个新的终端窗口

    bin/kafka-console-producer.sh 
    --broker-list localhost:9092 
    --topic test

    进入输入模式,随意输入信息,例如:

    hello world
    hi

    获取消息

    打开一个新的终端窗口

    bin/kafka-console-consumer.sh 
    --bootstrap-server localhost:9092 
    --topic test 
    --from-beginning

    便会显示出刚才发送的两条消息:

    hello world
    hi

    这时可以打开发送消息的终端窗口,输入新的信息,再返回来就可以看到自动接收到了新消息

    配置集群

    新建两个启动配置文件

    > cp config/server.properties 
    config/server-1.properties
    > cp config/server.properties 
    config/server-2.properties

    修改 config/server-1.properties 的以下几项配置:

    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=logs/kafka-logs-1

    修改 config/server-2.properties 的以下几项配置: 

    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=logs/kafka-logs-2

    启动

    > bin/kafka-server-start.sh 
    config/server-1.properties &
    > bin/kafka-server-start.sh 
    config/server-2.properties &

    创建一个topic,设置3个复制

    bin/kafka-topics.sh --create 
    --zookeeper localhost:2181 
    --replication-factor 3 
    --partitions 1 
    --topic my-replicated-topic

    发送消息

    bin/kafka-console-producer.sh 
    --broker-list localhost:9092 
    --topic my-replicated-topic

    输入消息:

    my test message 1
    my test message 2

    获取消息

    bin/kafka-console-consumer.sh 
    --bootstrap-server localhost:9092 
    --from-beginning 
    --topic my-replicated-topic

    可以正常取得消息

    容错测试

    # 取得server1的进程号
    ps aux | grep server-1.properties
    
    
    # 杀掉进程
    kill -9 43116
     

    读取消息

    bin/kafka-console-consumer.sh 
    --bootstrap-server localhost:9092 
    --from-beginning 
    --topic my-replicated-topic

    返回信息:

    my test message 1
    my test message 2

    仍然可以正常取得消息

    Kafka Connect

    Kafka 中的 connecter 可以与外部系统进行连接,例如文件系统、数据库

    下面实验一个简单文件系统交互,从一个文件中导入数据,然后导出到另一个文件中

    创建一个测试文件,用于导入数据使用

    echo -e "foo
    bar" > test.txt

    启动 connect,执行数据的导入导出

    bin/connect-standalone.sh 
    config/connect-standalone.properties 
    config/connect-file-source.properties 
    config/connect-file-sink.properties

    命令执行后,会输出一系列的日志信息,等待执行完毕

    查看导出结果

    cat test.sink.txt

    返回结果:

    foo
    bar 

    成功导出了 test.txt 中的数据 

    过程分析

    执行第2步的命令后,为什么是去读test.txt?为什么写入了test.sink.txt?中间的过程是什么样的?

    原因是在于两个配置文件

    config/connect-file-source.properties (导入配置)

    name=local-file-source
    connector.class=FileStreamSource
    tasks.max=1
    file=test.txt
    topic=connect-test  

    file指定了是从test.txt中导入数据

    topic指定了把数据发送到connect-test这个topic

    connect-file-sink.properties(导出配置)

    name=local-file-sink
    connector.class=FileStreamSink
    tasks.max=1
    file=test.sink.txt
    topics=connect-test

    file指定了把数据导出到test.txt中导入数据

    topic指定从connect-test这个topic中读取数据

    查看一下connect-test这个topic

    bin/kafka-console-consumer.sh 
    --bootstrap-server localhost:9092 
    --topic connect-test 
    --from-beginning

    结果为:

    {"schema":{"type":"string","optional":false},"payload":"foo"}
    {"schema":{"type":"string","optional":false},"payload":"bar"}

    现在向test.txt中添加一条新数据:

    echo "Another line" >> test.txt

    再次执行 cat test.sink.txt 就会看到刚刚添加的数据:

        
  • 相关阅读:
    Linux 命令大全
    MySQL 存储 utf8mb4
    PHP房贷计算器代码,等额本息,等额本金
    laravel 原生 sql
    include_once 问题
    laravel count distinct
    微信小程序显示cms里的html文章
    PHP文件上传
    Ajax做无刷新分页
    PHP封装返回Ajax字符串和JSON数组
  • 原文地址:https://www.cnblogs.com/jun1019/p/6260263.html
Copyright © 2011-2022 走看看