zoukankan      html  css  js  c++  java
  • Vertica的这些事(十四)——Vertica实时消费kafka实现

    一、 安装环境
    Vertica官方提供了消费kafka的方法,需要注意版本对应
    这里写图片描述
    消费kafka原理,是Vertica提供的Udx

    这里写图片描述

    首先需要安装相应的环境

    /${vertica}/packages/kafka/ddl/install.sql
    

    判断是否安装成功

    /${vertica}/packages/kafka/ddl/isinstalled.sql
    

    二、 单次消费kafka
    参考官方文档 Using COPY with Kafka

    COPY schema.target_table SOURCE KafkaSource (stream='topic1|1|1,topic2|2|2', brokers='host1:9092,
                                                    host2:9092',duration= INTERVAL'timeslice',stop_on_eof=TRUE,
                                                    eof_timeout= INTERVAL'timeslice')
            PARSER KafkaJSONParser(flatten_arrays=False, flatten_maps=False)
               REJECTED DATA AS TABLE schema.rejection_table TRICKLE;
    

    三、 实时消费kafka
    参考官方文档Using Kafka with Vertica

    1. 首先创建一个Scheduler
    /opt/vertica/packages/kafka/bin/vkconfig scheduler --add --config-schema myScheduler --operator user1
    

    使用conf封装Vertica数据库登录信息

    kafka_config=”—cinfig-schema kafka01 –dbhoust 172.17.12.1 –username dbadmin –password pass1”
    
    1. 创建Scheduler脚本
    /opt/vertica/packages/kafka/bin/vkconfig scheduler –add ${ kafka_config } –config-schema kafka_config  --operator dbadmin
    
    1. 创建kafka集群信息
    BROKERS=”172.17.12.2:9099, 172.17.12.3:9099, 172.17.12.4:9099”
    /opt/vertica/packages/kafka/bin/vkconfig kafka-cluster –add  ${ kafka_config } --onfig-schema kafka_config  --cluster KafkaCluster –brokers $ BROKERS
    
    1. 读取topic
    /opt/vertica/packages/kafka/bin/vkconfig topic –add ${ kafka_config } –target public.kafka_tgt –rejection-table public.kafka_rej –cluster KafkaCluster –topic web_pagelogs –number-partitions 1
    
    1. 发布Scheduler
    /opt/vertica/packages/kafka/bin/vkconfig launch ${ kafka_config } -- onfig-schema kafka_config –instance-name webpagelogs 
    
    1. 删除scheduler
    /opt/vertica/packages/kafka/bin/vkconfig scheduler ${kafka_config} –remove –config-schema kafka_config 
    
    1. 删除topic接收
    /opt/vertica/packages/kafka/bin/vkconfig topic ${kafka_config} –remove –target public.kafka_tgt
    

    PS:
    通过最新对Vertica消费kafka的使用,发现这个功能比较鸡肋。多个topic也只能放到一个scheduler里面执行消费,而且每次修改增加都需要停下所有topic的消费进程。另外在使用过程中也发现了丢失数据的现象。

  • 相关阅读:
    Python的内置模块itertools
    列表的sort()和sorted()方法
    Python面试
    数据分析相关概念
    数据分析中Numpy,Pandas,Matplotlib,scripy和Scikit-Learn等数据处理库...
    mysql数据库的语法及简介
    Cannot add foreign key constraint
    MySQL数据库重装
    MySQL数据库在Python中的操作
    Python中的取整函数
  • 原文地址:https://www.cnblogs.com/qinchaofeng/p/12667440.html
Copyright © 2011-2022 走看看