zoukankan      html  css  js  c++  java
  • kafka-connect-kudu-sink插件

    kafka-connect-hive是基于kafka-connect平台实现的hive数据读取和写入插件,主要由sourcesink两部分组成,source部分完成hive表数据的读取任务,kafka-connect将这些数据写入到其他数据存储层中,比如hiveES数据的流入。sink部分完成向hive表写数据的任务,kafka-connect将第三方数据源(如MySQL)里的数据读取并写入到hive表中。

    在这里我使用的是Landoop公司开发的kafka-connect-hive插件,项目文档地址Hive Sink,接下来看看如何使用该插件的sink部分。

    环境准备

    • Apache Kafka 2.11-2.1.0
    • Confluent-5.1.0
    • Apache Hadoop 2.6.3
    • Apache Hive 1.2.1
    • Java 1.8

    功能

    • 支持KCQL路由查询,允许将kafka主题中的所有字段或部分字段写入hive表中
    • 支持根据某一字段动态分区
    • 支持全量和增量同步数据,不支持部分更新

    开始使用

    启动依赖

    1、启动kafka

    cd kafka_2.11-2.1.0
    bin/kafka-server-start.sh config/server.properties &
    

    2、启动schema-registry

    cd confluent-5.1.0
    bin/schema-registry-start etc/schema-registry/schema-registry.properties &
    

    schema-registry组件提供了kafka topicschema管理功能,保存了schema的各个演变版本,帮助我们解决新旧数据schema兼容问题。这里我们使用apache avro库来序列化kafkakeyvalue,因此需要依赖schema-registry组件,schema-registry使用默认的配置。

    3、启动kafka-connect

    修改confluent-5.1.0/etc/schema-registry目录下connect-avro-distributed.properties文件的配置,修改后内容如下:

    # Sample configuration for a distributed Kafka Connect worker that uses Avro serialization and
    # integrates the the Schema Registry. This sample configuration assumes a local installation of
    # Confluent Platform with all services running on their default ports.
    
    # Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
    bootstrap.servers=localhost:9092
    
    # The group ID is a unique identifier for the set of workers that form a single Kafka Connect
    # cluster
    group.id=connect-cluster
    
    # The converters specify the format of data in Kafka and how to translate it into Connect data.
    # Every Connect user will need to configure these based on the format they want their data in
    # when loaded from or stored into Kafka
    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://localhost:8081
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    
    # Internal Storage Topics.
    #
    # Kafka Connect distributed workers store the connector and task configurations, connector offsets,
    # and connector statuses in three internal topics. These topics MUST be compacted.
    # When the Kafka Connect distributed worker starts, it will check for these topics and attempt to create them
    # as compacted topics if they don't yet exist, using the topic name, replication factor, and number of partitions
    # as specified in these properties, and other topic-specific settings inherited from your brokers'
    # auto-creation settings. If you need more control over these other topic-specific settings, you may want to
    # manually create these topics before starting Kafka Connect distributed workers.
    #
    # The following properties set the names of these three internal topics for storing configs, offsets, and status.
    config.storage.topic=connect-configs
    offset.storage.topic=connect-offsets
    status.storage.topic=connect-statuses
    
    # The following properties set the replication factor for the three internal topics, defaulting to 3 for each
    # and therefore requiring a minimum of 3 brokers in the cluster. Since we want the examples to run with
    # only a single broker, we set the replication factor here to just 1. That's okay for the examples, but
    # ALWAYS use a replication factor of AT LEAST 3 for production environments to reduce the risk of 
    # losing connector offsets, configurations, and status.
    config.storage.replication.factor=1
    offset.storage.replication.factor=1
    status.storage.replication.factor=1
    
    # The config storage topic must have a single partition, and this cannot be changed via properties. 
    # Offsets for all connectors and tasks are written quite frequently and therefore the offset topic
    # should be highly partitioned; by default it is created with 25 partitions, but adjust accordingly
    # with the number of connector tasks deployed to a distributed worker cluster. Kafka Connect records
    # the status less frequently, and so by default the topic is created with 5 partitions.
    #offset.storage.partitions=25
    #status.storage.partitions=5
    
    # The offsets, status, and configurations are written to the topics using converters specified through
    # the following required properties. Most users will always want to use the JSON converter without schemas. 
    # Offset and config data is never visible outside of Connect in this format.
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    
    # Confluent Control Center Integration -- uncomment these lines to enable Kafka client interceptors
    # that will report audit data that can be displayed and analyzed in Confluent Control Center
    # producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
    # consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
    
    # These are provided to inform the user about the presence of the REST host and port configs
    # Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
    #rest.host.name=0.0.0.0
    #rest.port=8083
    
    # The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
    #rest.advertised.host.name=0.0.0.0
    #rest.advertised.port=8083
    
    # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
    # (connectors, converters, transformations). The list should consist of top level directories that include
    # any combination of:
    # a) directories immediately containing jars with plugins and their dependencies
    # b) uber-jars with plugins and their dependencies
    # c) directories immediately containing the package directory structure of classes of plugins and their dependencies
    # Examples:
    # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
    # Replace the relative path below with an absolute path if you are planning to start Kafka Connect from within a
    # directory other than the home directory of Confluent Platform.
    plugin.path=/kafka/confluent-5.1.0/plugins/lib
    

    这里需要设置plugin.path参数,该参数指定了kafka-connect插件包的保存地址,必须得设置。

    下载kafka-connect-hive-1.2.1-2.1.0-all.tar.gz,解压后将kafka-connect-hive-1.2.1-2.1.0-all.jar放到plugin.path指定的目录下,然后执行如下命令启动kafka-connect

    cd confluent-5.1.0
    bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties
    

    准备测试数据

    1、在hive服务器上使用beeline执行如下命令:

    # 创建hive_connect数据库
    create database hive_connect;
    # 创建cities_orc表
    use hive_connect;
    create table cities_orc (city string, state string, population int, country string) stored as orc;
    

    2、使用postman添加kafka-connect-hive sink的配置到kafka-connect

    URL:localhost:8083/connectors/

    请求类型:POST

    请求体如下:

    {
        "name": "hive-sink-example",
        "config": {
            "name": "hive-sink-example",
            "connector.class": "com.landoop.streamreactor.connect.hive.sink.hiveSinkConnector",
            "tasks.max": 1,
            "topics": "hive_sink_orc",
            "connect.hive.kcql": "insert into cities_orc select * from hive_sink_orc AUTOCREATE PARTITIONBY state STOREAS ORC WITH_FLUSH_INTERVAL = 10 WITH_PARTITIONING = DYNAMIC",
            "connect.hive.database.name": "hive_connect",
            "connect.hive.hive.metastore": "thrift",
            "connect.hive.hive.metastore.uris": "thrift://quickstart.cloudera:9083",
            "connect.hive.fs.defaultFS": "hdfs://quickstart.cloudera:9001",
            "connect.hive.error.policy": "NOOP",
            "connect.progress.enabled": true
        }
    }
    

    开始测试,查看结果

    启动kafka producer,写入测试数据,scala测试代码如下:

    class AvroTest {
    
     /**
        * 测试kafka使用avro方式生产数据
        * 参考 https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html
        */
      @Test
      def testProducer: Unit = {
        // 设置kafka broker地址、序列化方式、schema-registry组件的地址
        val props = new Properties()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[io.confluent.kafka.serializers.KafkaAvroSerializer])
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[io.confluent.kafka.serializers.KafkaAvroSerializer])
        props.put("citySchema.registry.url", "http://localhost:8081")
    
        // 设置schema
        val citySchema = "{"type":"record","name":"myrecord","fields":[{"name":"city","type":"string"},{"name":"state","type":"string"},{"name":"population","type":"int"},{"name":"country","type":"string"}]}"
        val parser = new Schema.Parser()
        val schema = parser.parse(citySchema)
    
        // 构造测试数据
        val avroRecord1 = new GenericData.Record(schema)
        avroRecord1.put("city", "Philadelphia")
        avroRecord1.put("state", "PA")
        avroRecord1.put("population", 1568000)
        avroRecord1.put("country", "USA")
    
        val avroRecord2 = new GenericData.Record(schema)
        avroRecord2.put("city", "Chicago")
        avroRecord2.put("state", "IL")
        avroRecord2.put("population", 2705000)
        avroRecord2.put("country", "USA")
    
        val avroRecord3 = new GenericData.Record(schema)
        avroRecord3.put("city", "New York")
        avroRecord3.put("state", "NY")
        avroRecord3.put("population", 8538000)
        avroRecord3.put("country", "USA")
    
        // 生产数据
        val producer = new KafkaProducer[String, GenericData.Record](props)
        try {
          val recordList = List(avroRecord1, avroRecord2, avroRecord3)
          val key = "key1"
    
          for (elem <- recordList) {
            val record = new ProducerRecord("hive_sink_orc", key, elem)
            for (i <- 0 to 100) {
              val ack = producer.send(record).get()
              println(s"${ack.toString} written to partition ${ack.partition.toString}")
            }
          }
        } catch {
          case e: Throwable => e.printStackTrace()
        } finally {
          // When you're finished producing records, you can flush the producer to ensure it has all been written to Kafka and
          // then close the producer to free its resources.
          // 调用flush方法确保所有数据都被写入到Kafka
          producer.flush()
          // 调用close方法释放资源
          producer.close()
        }
      }
     
    }
    

    4、使用beeline查询hive数据:

    use hive_connect;
    select * from cities_orc;
    

    输出部分结果如下:

    +------------------+------------------------+---------------------+-------------------+--+
    | cities_orc.city  | cities_orc.population  | cities_orc.country  | cities_orc.state  |
    +------------------+------------------------+---------------------+-------------------+--+
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Philadelphia     | 1568000                | USA                 | PA                |
    | Philadelphia     | 1568000                | USA                 | PA                |
    | Philadelphia     | 1568000                | USA                 | PA                |
    | Philadelphia     | 1568000                | USA                 | PA                |
    | Philadelphia     | 1568000                | USA                 | PA                |
    | Philadelphia     | 1568000                | USA                 | PA                |
    | Philadelphia     | 1568000                | USA                 | PA                |
    | Philadelphia     | 1568000                | USA                 | PA                |
    | Philadelphia     | 1568000                | USA                 | PA                |
    | Philadelphia     | 1568000                | USA                 | PA                |          
    

    配置说明

    KCQL配置

    connect.hive.kcql中的配置项说明如下:

    • WITH_FLUSH_INTERVALlong类型,表示文件提交的时间间隔,单位是毫秒
    • WITH_FLUSH_SIZElong类型,表示执行提交操作之前,已提交到HDFS的文件长度
    • WITH_FLUSH_COUNTlong类型,表示执行提交操作之前,未提交到HDFS的记录数
    • WITH_SCHEMA_EVOLUTIONstring类型,默认值是MATCH,表示hive schemakafka topic recordschema的兼容策略,hive connector会使用该策略来添加或移除字段
    • WITH_TABLE_LOCATIONstring类型,表示hive表在HDFS中的存储位置,如果不指定的话,将使用hive中默认的配置
    • WITH_OVERWRITEboolean类型,表示是否覆盖hive表中已存在的记录,使用该策略时,会先删除已有的表,再新建
    • PARTITIONBYList<String>类型,保存分区字段。指定后,将从指定的列中获取分区字段的值
    • WITH_PARTITIONINGstring类型,默认值是STRICT,表示分区创建方式。主要有DYNAMICSTRICT两种方式。DYNAMIC方式将根据PARTITIONBY指定的分区字段创建分区,STRICT方式要求必须已经创建了所有分区
    • AUTOCREATEboolean类型,表示是否自动创建表

    Kafka connect配置

    Kafka connect的配置项说明如下:

    • namestring类型,表示connector的名称,在整个kafka-connect集群中唯一
    • topicsstring类型,表示保存数据的topic名称,必须与KCQL语句中的topic名称一致
    • tasks.max :int类型,默认值为1,表示connector的任务数量
    • connector.class :string类型,表示connector类的名称,值必须是com.landoop.streamreactor.connect.hive.sink.HiveSinkConnector
    • connect.hive.kcqlstring类型,表示kafka-connect查询语句
    • connect.hive.database.namestring类型,表示hive数据库的名称
    • connect.hive.hive.metastorestring类型,表示连接hive metastore所使用的网络协议
    • connect.hive.hive.metastore.urisstring类型,表示hive metastore的连接地址
    • connect.hive.fs.defaultFSstring类型,表示HDFS的地址
     
  • 相关阅读:
    FZU 2150 Fire Game
    POJ 3414 Pots
    POJ 3087 Shuffle'm Up
    POJ 3126 Prime Path
    POJ 1426 Find The Multiple
    POJ 3278 Catch That Cow
    字符数组
    HDU 1238 Substing
    欧几里德和扩展欧几里德详解 以及例题CodeForces 7C
    Codeforces 591B Rebranding
  • 原文地址:https://www.cnblogs.com/dengbangpang/p/12987599.html
Copyright © 2011-2022 走看看