zoukankan      html  css  js  c++  java
  • Kafka学习笔记之confluent platform入门

    0x00 下载

    http://www.confluent.io/download,打开后,显示最新版本3.0.0,然后在右边填写信息后,点击Download下载。

     

    之后跳转到下载页面,选择zip 或者 tar都行, 下载完成后上传linux系统,解压即完成安装。

    Confluent 目前还不支持Windows系统。Windows用户可以下载和使用zip 和 tar包,但最好直接运行jar文件 ,而不是使用包装脚本。

    0x01 Requirements

    唯一需要的条件是java 版本>=1.7。

    0x02 Confluent Platform快速入门

    你可以快速的运行Confluent platform在单台服务器上。在这篇quickstart,我们将介绍如何运行ZooKeeper,Kafka,和Schema Registry,然后如何读和写一些Avro数据从/到Kafka。

    (如果你想跑一个数据管道用Kafka Connect和Control Center,参考The Control Center QuickStart Guide.)我们随后也会介绍。

    1.下载和安装Confluent platform。在这篇quickstart 我们使用zip包,也有很多其他安装方式,见上。

    $ wget http://packages.confluent.io/archive/3.0/confluent-3.0.0-2.11.zip
    $ unzip confluent-3.0.0-2.11.zip
    $ cd confluent-3.0.0

     下边展示的是安装目录里上层层级结构:

    confluent-3.0.0/bin/        # Driver scripts for starting/stopping services
    confluent-3.0.0/etc/        # Configuration files
    confluent-3.0.0/share/java/ # Jars

    如果你通过deb或者rpm安装,目录结构如下:

    /usr/bin/                  # Driver scripts for starting/stopping services, prefixed with <package> names
    /etc/<package>/            # Configuration files
    /usr/share/java/<package>/ # Jars

    2.启动Zookeeper。因为这是长期运行的服务,你应该运行它在一个独立的终端(或者在后边运行它,重定向输出到一个文件中)。你需要有写权限到/var/lib在这一步以及之后的步骤里:

    # The following commands assume you exactly followed the instructions above.
    # This means, for example, that at this point your current working directory
    # must be confluent-3.0.0/.
    $ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties

    3.启动Kafka,同样在一个独立的终端。

    $ ./bin/kafka-server-start ./etc/kafka/server.properties

    4.启动Schema Registry,同样在一个独立的终端。

    $ ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

    5.现在所有需要的服务都已启动,我们发送一些Avro数据到Kafka的topic中。虽然这一步一般会得到一些数据从一些应用里,这里我们使用Kafka提供的例子,不用写代码。我们在本地的Kafka集群里,写数据到topic “test”里,读取每一行Avro信息,校验Schema Registry .

    $ ./bin/kafka-avro-console-producer 
             --broker-list localhost:9092 --topic test 
             --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

    一旦启动,进程等待你输入一些信息,一条一行,会发送到topic中一旦按下enter键。试着输入一些信息:

    {"f1": "value1"}
    {"f1": "value2"}
    {"f1": "value3"}

    输入完成后,可以使用Ctrl+C来终止进程。

    Note:如果一个空行你按下Enter键,会被解释为一个null值,引起错误。然后仅仅需要做的是启动producer进程,接着输入信息。

    6.现在我们可以检查,通过Kafka consumer控制台读取数据从topic。在topic ‘test'中,Zookeeper实例,会告诉consumer解析数据使用相同的schema。最后从开始读取数据(默认consumer只读取它启动之后写入到topic中的数据)

    $ ./bin/kafka-avro-console-consumer --topic test 
             --zookeeper localhost:2181 
             --from-beginning

    你会看到你之前在producer中输入的数据,以同样的格式。

    consumer不会退出,它可以监听写入到topic中的新数据。保持consumer运行,然后重复第5步,输入一些信息,然后按下enter键,你会看到consumer会立即读取到写入到topic中的数据。

    当你完成了测试,可以用Ctrl+C终止进程。

    7.现在让我们尝试写一些不兼容的schema的数据到topic ’test‘中,我们重新运行producer命令,但是改变schema。

    
    
    $ ./bin/kafka-avro-console-producer 
             --broker-list localhost:9092 --topic test 
             --property value.schema='{"type":"int"}'
     

     现在输入一个整数按下enter键,你会看到以下的异常:

    复制代码
    org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: "int"
    Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with the latest schema; error code: 409
           at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.httpRequest(RestUtils.java:146)
           at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.registerSchema(RestUtils.java:174)
           at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:51)
           at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:89)
           at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:49)
           at io.confluent.kafka.formatter.AvroMessageReader.readMessage(AvroMessageReader.java:155)
           at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:94)
           at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)

    复制代码

     当producer试图发送一些信息,它会检查schema用Schema Registry。当返回错误时说明现在的schema无效,因为它不能兼容之前设置的schema。控制台打印出错误信息并退出,但是你自己的应用可以更加人性化处理这类问题。但最重要的是,我们保证不让不兼容的数据写入到Kafka中。

    8.当你完成这一系列测试,你可以使用ctrl+c来关闭服务,以启动时相反的顺序。

    这一简单的教程包含了Kafka和Schema Registry这一些核心的服务。你也可以参考以下document:

  • 相关阅读:
    javaweb消息中间件——rabbitmq入门
    virtual box 桥接模式(bridge adapter)下无法获取ip(determine ip failed)的解决方法
    Apache Kylin本地启动
    git操作
    Java学习总结
    Java中同步的几种实现方式
    hibernate exception nested transactions not supported 解决方法
    vue 中解决移动端使用 js sdk 在ios 上一直报invalid signature 的问题解决
    cookie 的使用
    vue 专门为了解决修改微信标题而生的项目
  • 原文地址:https://www.cnblogs.com/JetpropelledSnake/p/10550637.html
Copyright © 2011-2022 走看看