1、在Greenplum数据库中创建目标表
2、Kafka创建Topic并向Topic写一些消息,格式:{"org_id":"2B79D272-016A-11EB-88A7-000C29496EB0","org_name":"测试单位"}
3、配置yaml文件
DATABASE: gpkafka_test
USER: root
PASSWORD: 123456
HOST: 10.10.14.206
PORT: 5432
KAFKA:
INPUT:
SOURCE:
BROKERS: 10.10.14.238:9092
TOPIC: gp-kafka-test
COLUMNS:
- NAME: jdata
TYPE: json
FORMAT: json
ERROR_LIMIT: 10
OUTPUT:
TABLE: t_base_org
MAPPING:
- NAME: org_id
EXPRESSION: (jdata->>'org_id')::varchar
- NAME: org_name
EXPRESSION: (jdata->>'org_name')::varchar
COMMIT:
MAX_ROW: 5 #多少条一提交
MINIMAL_INTERVAL: 10000 #等待多少时间一提交(毫秒)
4、启动服务
gpkafka load gp_kafka_test.yaml --gpfdist-host 10.10.14.206
增加 --quit-at-eof 参数 gpkafka load 会在消费完topic中的消息后退出,不加该参数会无限等待消息来消费
注:--gpfdist-host 为master服务器的IP或机器名