zoukankan      html  css  js  c++  java
  • Kafka学习笔记之confluent platform入门

    0x00 下载

    http://www.confluent.io/download,打开后,显示最新版本3.0.0,然后在右边填写信息后,点击Download下载。

     

    之后跳转到下载页面,选择zip 或者 tar都行, 下载完成后上传linux系统,解压即完成安装。

    Confluent 目前还不支持Windows系统。Windows用户可以下载和使用zip 和 tar包,但最好直接运行jar文件 ,而不是使用包装脚本。

    0x01 Requirements

    唯一需要的条件是java 版本>=1.7。

    0x02 Confluent Platform快速入门

    你可以快速的运行Confluent platform在单台服务器上。在这篇quickstart,我们将介绍如何运行ZooKeeper,Kafka,和Schema Registry,然后如何读和写一些Avro数据从/到Kafka。

    (如果你想跑一个数据管道用Kafka Connect和Control Center,参考The Control Center QuickStart Guide.)我们随后也会介绍。

    1.下载和安装Confluent platform。在这篇quickstart 我们使用zip包,也有很多其他安装方式,见上。

    $ wget http://packages.confluent.io/archive/3.0/confluent-3.0.0-2.11.zip
    $ unzip confluent-3.0.0-2.11.zip
    $ cd confluent-3.0.0

     下边展示的是安装目录里上层层级结构:

    confluent-3.0.0/bin/        # Driver scripts for starting/stopping services
    confluent-3.0.0/etc/        # Configuration files
    confluent-3.0.0/share/java/ # Jars

    如果你通过deb或者rpm安装,目录结构如下:

    /usr/bin/                  # Driver scripts for starting/stopping services, prefixed with <package> names
    /etc/<package>/            # Configuration files
    /usr/share/java/<package>/ # Jars

    2.启动Zookeeper。因为这是长期运行的服务,你应该运行它在一个独立的终端(或者在后边运行它,重定向输出到一个文件中)。你需要有写权限到/var/lib在这一步以及之后的步骤里:

    # The following commands assume you exactly followed the instructions above.
    # This means, for example, that at this point your current working directory
    # must be confluent-3.0.0/.
    $ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties

    3.启动Kafka,同样在一个独立的终端。

    $ ./bin/kafka-server-start ./etc/kafka/server.properties

    4.启动Schema Registry,同样在一个独立的终端。

    $ ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

    5.现在所有需要的服务都已启动,我们发送一些Avro数据到Kafka的topic中。虽然这一步一般会得到一些数据从一些应用里,这里我们使用Kafka提供的例子,不用写代码。我们在本地的Kafka集群里,写数据到topic “test”里,读取每一行Avro信息,校验Schema Registry .

    $ ./bin/kafka-avro-console-producer 
             --broker-list localhost:9092 --topic test 
             --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

    一旦启动,进程等待你输入一些信息,一条一行,会发送到topic中一旦按下enter键。试着输入一些信息:

    {"f1": "value1"}
    {"f1": "value2"}
    {"f1": "value3"}

    输入完成后,可以使用Ctrl+C来终止进程。

    Note:如果一个空行你按下Enter键,会被解释为一个null值,引起错误。然后仅仅需要做的是启动producer进程,接着输入信息。

    6.现在我们可以检查,通过Kafka consumer控制台读取数据从topic。在topic ‘test'中,Zookeeper实例,会告诉consumer解析数据使用相同的schema。最后从开始读取数据(默认consumer只读取它启动之后写入到topic中的数据)

    $ ./bin/kafka-avro-console-consumer --topic test 
             --zookeeper localhost:2181 
             --from-beginning

    你会看到你之前在producer中输入的数据,以同样的格式。

    consumer不会退出,它可以监听写入到topic中的新数据。保持consumer运行,然后重复第5步,输入一些信息,然后按下enter键,你会看到consumer会立即读取到写入到topic中的数据。

    当你完成了测试,可以用Ctrl+C终止进程。

    7.现在让我们尝试写一些不兼容的schema的数据到topic ’test‘中,我们重新运行producer命令,但是改变schema。

    
    
    $ ./bin/kafka-avro-console-producer 
             --broker-list localhost:9092 --topic test 
             --property value.schema='{"type":"int"}'
     

     现在输入一个整数按下enter键,你会看到以下的异常:

    复制代码
    org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: "int"
    Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with the latest schema; error code: 409
           at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.httpRequest(RestUtils.java:146)
           at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.registerSchema(RestUtils.java:174)
           at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:51)
           at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:89)
           at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:49)
           at io.confluent.kafka.formatter.AvroMessageReader.readMessage(AvroMessageReader.java:155)
           at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:94)
           at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)

    复制代码

     当producer试图发送一些信息,它会检查schema用Schema Registry。当返回错误时说明现在的schema无效,因为它不能兼容之前设置的schema。控制台打印出错误信息并退出,但是你自己的应用可以更加人性化处理这类问题。但最重要的是,我们保证不让不兼容的数据写入到Kafka中。

    8.当你完成这一系列测试,你可以使用ctrl+c来关闭服务,以启动时相反的顺序。

    这一简单的教程包含了Kafka和Schema Registry这一些核心的服务。你也可以参考以下document:

  • 相关阅读:
    微信小程序组件loading
    微信小程序组件toast
    微信小程序组件modal
    Thread was being aborted.
    Linux(Contos7.5)环境搭建之Linux远程登录(一)
    Method 'ExecuteAsync' in type 'System.Data.Entity.SqlServer.DefaultSqlExecutionStrategy' does not have an implementation
    Cannot find class [org.springframework.http.converter.json.MappingJacksonHttpMessageConverter]
    Visual Studio 调试时无法命中断点
    springjdbc使用c3p0连接池报错 java.lang.NoClassDefFoundError: com/mchange/v2/ser/Indirector
    JUnit initializationError错误
  • 原文地址:https://www.cnblogs.com/JetpropelledSnake/p/10550637.html
Copyright © 2011-2022 走看看