zoukankan      html  css  js  c++  java
  • kafka-connect-kudu-sink插件

    kafka-connect-hive是基于kafka-connect平台实现的hive数据读取和写入插件,主要由sourcesink两部分组成,source部分完成hive表数据的读取任务,kafka-connect将这些数据写入到其他数据存储层中,比如hiveES数据的流入。sink部分完成向hive表写数据的任务,kafka-connect将第三方数据源(如MySQL)里的数据读取并写入到hive表中。

    在这里我使用的是Landoop公司开发的kafka-connect-hive插件,项目文档地址Hive Sink,接下来看看如何使用该插件的sink部分。

    环境准备

    • Apache Kafka 2.11-2.1.0
    • Confluent-5.1.0
    • Apache Hadoop 2.6.3
    • Apache Hive 1.2.1
    • Java 1.8

    功能

    • 支持KCQL路由查询,允许将kafka主题中的所有字段或部分字段写入hive表中
    • 支持根据某一字段动态分区
    • 支持全量和增量同步数据,不支持部分更新

    开始使用

    启动依赖

    1、启动kafka

    cd kafka_2.11-2.1.0
    bin/kafka-server-start.sh config/server.properties &
    

    2、启动schema-registry

    cd confluent-5.1.0
    bin/schema-registry-start etc/schema-registry/schema-registry.properties &
    

    schema-registry组件提供了kafka topicschema管理功能,保存了schema的各个演变版本,帮助我们解决新旧数据schema兼容问题。这里我们使用apache avro库来序列化kafkakeyvalue,因此需要依赖schema-registry组件,schema-registry使用默认的配置。

    3、启动kafka-connect

    修改confluent-5.1.0/etc/schema-registry目录下connect-avro-distributed.properties文件的配置,修改后内容如下:

    # Sample configuration for a distributed Kafka Connect worker that uses Avro serialization and
    # integrates the the Schema Registry. This sample configuration assumes a local installation of
    # Confluent Platform with all services running on their default ports.
    
    # Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
    bootstrap.servers=localhost:9092
    
    # The group ID is a unique identifier for the set of workers that form a single Kafka Connect
    # cluster
    group.id=connect-cluster
    
    # The converters specify the format of data in Kafka and how to translate it into Connect data.
    # Every Connect user will need to configure these based on the format they want their data in
    # when loaded from or stored into Kafka
    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://localhost:8081
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    
    # Internal Storage Topics.
    #
    # Kafka Connect distributed workers store the connector and task configurations, connector offsets,
    # and connector statuses in three internal topics. These topics MUST be compacted.
    # When the Kafka Connect distributed worker starts, it will check for these topics and attempt to create them
    # as compacted topics if they don't yet exist, using the topic name, replication factor, and number of partitions
    # as specified in these properties, and other topic-specific settings inherited from your brokers'
    # auto-creation settings. If you need more control over these other topic-specific settings, you may want to
    # manually create these topics before starting Kafka Connect distributed workers.
    #
    # The following properties set the names of these three internal topics for storing configs, offsets, and status.
    config.storage.topic=connect-configs
    offset.storage.topic=connect-offsets
    status.storage.topic=connect-statuses
    
    # The following properties set the replication factor for the three internal topics, defaulting to 3 for each
    # and therefore requiring a minimum of 3 brokers in the cluster. Since we want the examples to run with
    # only a single broker, we set the replication factor here to just 1. That's okay for the examples, but
    # ALWAYS use a replication factor of AT LEAST 3 for production environments to reduce the risk of 
    # losing connector offsets, configurations, and status.
    config.storage.replication.factor=1
    offset.storage.replication.factor=1
    status.storage.replication.factor=1
    
    # The config storage topic must have a single partition, and this cannot be changed via properties. 
    # Offsets for all connectors and tasks are written quite frequently and therefore the offset topic
    # should be highly partitioned; by default it is created with 25 partitions, but adjust accordingly
    # with the number of connector tasks deployed to a distributed worker cluster. Kafka Connect records
    # the status less frequently, and so by default the topic is created with 5 partitions.
    #offset.storage.partitions=25
    #status.storage.partitions=5
    
    # The offsets, status, and configurations are written to the topics using converters specified through
    # the following required properties. Most users will always want to use the JSON converter without schemas. 
    # Offset and config data is never visible outside of Connect in this format.
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    
    # Confluent Control Center Integration -- uncomment these lines to enable Kafka client interceptors
    # that will report audit data that can be displayed and analyzed in Confluent Control Center
    # producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
    # consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
    
    # These are provided to inform the user about the presence of the REST host and port configs
    # Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
    #rest.host.name=0.0.0.0
    #rest.port=8083
    
    # The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
    #rest.advertised.host.name=0.0.0.0
    #rest.advertised.port=8083
    
    # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
    # (connectors, converters, transformations). The list should consist of top level directories that include
    # any combination of:
    # a) directories immediately containing jars with plugins and their dependencies
    # b) uber-jars with plugins and their dependencies
    # c) directories immediately containing the package directory structure of classes of plugins and their dependencies
    # Examples:
    # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
    # Replace the relative path below with an absolute path if you are planning to start Kafka Connect from within a
    # directory other than the home directory of Confluent Platform.
    plugin.path=/kafka/confluent-5.1.0/plugins/lib
    

    这里需要设置plugin.path参数,该参数指定了kafka-connect插件包的保存地址,必须得设置。

    下载kafka-connect-hive-1.2.1-2.1.0-all.tar.gz,解压后将kafka-connect-hive-1.2.1-2.1.0-all.jar放到plugin.path指定的目录下,然后执行如下命令启动kafka-connect

    cd confluent-5.1.0
    bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties
    

    准备测试数据

    1、在hive服务器上使用beeline执行如下命令:

    # 创建hive_connect数据库
    create database hive_connect;
    # 创建cities_orc表
    use hive_connect;
    create table cities_orc (city string, state string, population int, country string) stored as orc;
    

    2、使用postman添加kafka-connect-hive sink的配置到kafka-connect

    URL:localhost:8083/connectors/

    请求类型:POST

    请求体如下:

    {
        "name": "hive-sink-example",
        "config": {
            "name": "hive-sink-example",
            "connector.class": "com.landoop.streamreactor.connect.hive.sink.hiveSinkConnector",
            "tasks.max": 1,
            "topics": "hive_sink_orc",
            "connect.hive.kcql": "insert into cities_orc select * from hive_sink_orc AUTOCREATE PARTITIONBY state STOREAS ORC WITH_FLUSH_INTERVAL = 10 WITH_PARTITIONING = DYNAMIC",
            "connect.hive.database.name": "hive_connect",
            "connect.hive.hive.metastore": "thrift",
            "connect.hive.hive.metastore.uris": "thrift://quickstart.cloudera:9083",
            "connect.hive.fs.defaultFS": "hdfs://quickstart.cloudera:9001",
            "connect.hive.error.policy": "NOOP",
            "connect.progress.enabled": true
        }
    }
    

    开始测试,查看结果

    启动kafka producer,写入测试数据,scala测试代码如下:

    class AvroTest {
    
     /**
        * 测试kafka使用avro方式生产数据
        * 参考 https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html
        */
      @Test
      def testProducer: Unit = {
        // 设置kafka broker地址、序列化方式、schema-registry组件的地址
        val props = new Properties()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[io.confluent.kafka.serializers.KafkaAvroSerializer])
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[io.confluent.kafka.serializers.KafkaAvroSerializer])
        props.put("citySchema.registry.url", "http://localhost:8081")
    
        // 设置schema
        val citySchema = "{"type":"record","name":"myrecord","fields":[{"name":"city","type":"string"},{"name":"state","type":"string"},{"name":"population","type":"int"},{"name":"country","type":"string"}]}"
        val parser = new Schema.Parser()
        val schema = parser.parse(citySchema)
    
        // 构造测试数据
        val avroRecord1 = new GenericData.Record(schema)
        avroRecord1.put("city", "Philadelphia")
        avroRecord1.put("state", "PA")
        avroRecord1.put("population", 1568000)
        avroRecord1.put("country", "USA")
    
        val avroRecord2 = new GenericData.Record(schema)
        avroRecord2.put("city", "Chicago")
        avroRecord2.put("state", "IL")
        avroRecord2.put("population", 2705000)
        avroRecord2.put("country", "USA")
    
        val avroRecord3 = new GenericData.Record(schema)
        avroRecord3.put("city", "New York")
        avroRecord3.put("state", "NY")
        avroRecord3.put("population", 8538000)
        avroRecord3.put("country", "USA")
    
        // 生产数据
        val producer = new KafkaProducer[String, GenericData.Record](props)
        try {
          val recordList = List(avroRecord1, avroRecord2, avroRecord3)
          val key = "key1"
    
          for (elem <- recordList) {
            val record = new ProducerRecord("hive_sink_orc", key, elem)
            for (i <- 0 to 100) {
              val ack = producer.send(record).get()
              println(s"${ack.toString} written to partition ${ack.partition.toString}")
            }
          }
        } catch {
          case e: Throwable => e.printStackTrace()
        } finally {
          // When you're finished producing records, you can flush the producer to ensure it has all been written to Kafka and
          // then close the producer to free its resources.
          // 调用flush方法确保所有数据都被写入到Kafka
          producer.flush()
          // 调用close方法释放资源
          producer.close()
        }
      }
     
    }
    

    4、使用beeline查询hive数据:

    use hive_connect;
    select * from cities_orc;
    

    输出部分结果如下:

    +------------------+------------------------+---------------------+-------------------+--+
    | cities_orc.city  | cities_orc.population  | cities_orc.country  | cities_orc.state  |
    +------------------+------------------------+---------------------+-------------------+--+
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Chicago          | 2705000                | USA                 | IL                |
    | Philadelphia     | 1568000                | USA                 | PA                |
    | Philadelphia     | 1568000                | USA                 | PA                |
    | Philadelphia     | 1568000                | USA                 | PA                |
    | Philadelphia     | 1568000                | USA                 | PA                |
    | Philadelphia     | 1568000                | USA                 | PA                |
    | Philadelphia     | 1568000                | USA                 | PA                |
    | Philadelphia     | 1568000                | USA                 | PA                |
    | Philadelphia     | 1568000                | USA                 | PA                |
    | Philadelphia     | 1568000                | USA                 | PA                |
    | Philadelphia     | 1568000                | USA                 | PA                |          
    

    配置说明

    KCQL配置

    connect.hive.kcql中的配置项说明如下:

    • WITH_FLUSH_INTERVALlong类型,表示文件提交的时间间隔,单位是毫秒
    • WITH_FLUSH_SIZElong类型,表示执行提交操作之前,已提交到HDFS的文件长度
    • WITH_FLUSH_COUNTlong类型,表示执行提交操作之前,未提交到HDFS的记录数
    • WITH_SCHEMA_EVOLUTIONstring类型,默认值是MATCH,表示hive schemakafka topic recordschema的兼容策略,hive connector会使用该策略来添加或移除字段
    • WITH_TABLE_LOCATIONstring类型,表示hive表在HDFS中的存储位置,如果不指定的话,将使用hive中默认的配置
    • WITH_OVERWRITEboolean类型,表示是否覆盖hive表中已存在的记录,使用该策略时,会先删除已有的表,再新建
    • PARTITIONBYList<String>类型,保存分区字段。指定后,将从指定的列中获取分区字段的值
    • WITH_PARTITIONINGstring类型,默认值是STRICT,表示分区创建方式。主要有DYNAMICSTRICT两种方式。DYNAMIC方式将根据PARTITIONBY指定的分区字段创建分区,STRICT方式要求必须已经创建了所有分区
    • AUTOCREATEboolean类型,表示是否自动创建表

    Kafka connect配置

    Kafka connect的配置项说明如下:

    • namestring类型,表示connector的名称,在整个kafka-connect集群中唯一
    • topicsstring类型,表示保存数据的topic名称,必须与KCQL语句中的topic名称一致
    • tasks.max :int类型,默认值为1,表示connector的任务数量
    • connector.class :string类型,表示connector类的名称,值必须是com.landoop.streamreactor.connect.hive.sink.HiveSinkConnector
    • connect.hive.kcqlstring类型,表示kafka-connect查询语句
    • connect.hive.database.namestring类型,表示hive数据库的名称
    • connect.hive.hive.metastorestring类型,表示连接hive metastore所使用的网络协议
    • connect.hive.hive.metastore.urisstring类型,表示hive metastore的连接地址
    • connect.hive.fs.defaultFSstring类型,表示HDFS的地址
     
  • 相关阅读:
    C/C++预处理指令#define,#ifdef,#ifndef,#endif…
    解析.DBC文件, 读懂CAN通信矩阵,实现车内信号仿真
    Elasticsearch Aggregation 多个字段分组统计 Java API实现
    [转]Elasticsearch Java API总汇
    ElasticSearch Aggs的一些使用方法
    ElasticSearch 简单入门
    jQuery表格自动增加
    JVM(Java虚拟机)优化大全和案例实战
    Tomcat性能调优-让小猫飞奔
    Mapreduce部署与第三方依赖包管理
  • 原文地址:https://www.cnblogs.com/dengbangpang/p/12987599.html
Copyright © 2011-2022 走看看