zoukankan      html  css  js  c++  java
  • ES 使用debezium同步到ES

    logstash和kafka_connector都仅支持基于自增id或者时间戳更新的方式增量同步数据。

    回到问题本身:如果库表里没有相关字段,该如何处理呢?

    本文给出相关探讨和解决方案。

    1、 binlog认知

    1.1 啥是 binlog?

    binlog是Mysql sever层维护的一种二进制日志,与innodb引擎中的redo/undo log是完全不同的日志;其主要是用来记录对mysql数据更新或潜在发生更新的SQL语句,并以"事务"的形式保存在磁盘中。

    作用主要有:

    • 1)复制:达到master-slave数据一致的目的。

    • 2)数据恢复:通过mysqlbinlog工具恢复数据。

    • 3)增量备份

    1.2 阿里的Canal实现了增量Mysql同步

    一图胜千言,canal是用java开发的基于数据库增量日志解析、提供增量数据订阅&消费的中间件。

    目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。目的:增量数据订阅&消费

    综上,使用binlog可以突破logstash或者kafka-connector没有自增id或者没有时间戳字段的限制,实现增量同步。

    2、基于binlog的同步方式

    1)基于kafka Connect的Debezium 开源工程,地址:. https://debezium.io/

    2)不依赖第三方的独立应用: Maxwell开源项目,地址:http://maxwells-daemon.io/

    由于已经部署过conluent(kafka的企业版本,自带zookeeper、kafka、ksql、kafka-connector等),本文仅针对Debezium展开。

    3、Debezium介绍

    Debezium是捕获数据实时动态变化的开源的分布式同步平台。能实时捕获到数据源(Mysql、Mongo、PostgreSql)的:新增(inserts)、更新(updates)、删除(deletes)操作,实时同步到Kafka,稳定性强且速度非常快。

    特点:

    • 1)简单。无需修改应用程序。可对外提供服务。

    • 2)稳定。持续跟踪每一行的每一处变动。

    • 3)快速。构建于kafka之上,可扩展,经官方验证可处理大容量的数据。

    4、同步架构

    如图,Mysql到ES的同步策略,采取“曲线救国”机制。

    步骤1: 基Debezium的binlog机制,将Mysql数据同步到Kafka。

    步骤2: 基于Kafka_connector机制,将kafka数据同步到Elasticsearch。

    5、Debezium实现Mysql到ES增删改实时同步

    软件版本:

    confluent:5.1.2;
    Debezium:0.9.2_Final;
    Mysql:5.7.x.
    Elasticsearch:6.6.1

    5.1 Debezium安装

    confluent的安装部署参见:http://t.cn/Ef5poZk,不再赘述。

    Debezium的安装只需要把debezium-connector-mysql的压缩包解压放到Confluent的解压后的插件目录(share/java)中。

    MySQL Connector plugin 压缩包的下载地址:

    https://debezium.io/docs/install/

    注意重启一下confluent,以使得Debezium生效。

    5.2 Mysql binlog等相关配置。

    Debezium使用MySQL的binlog机制实现数据动态变化监测,所以需要Mysql提前配置binlog。

    核心配置如下,在Mysql机器的/etc/my.cnf的mysqld下添加如下配置。

    1[mysqld]
    2
    3server-id         = 223344
    4log_bin           = mysql-bin
    5binlog_format     = row
    6binlog_row_image  = full
    7expire_logs_days  = 10

    然后,重启一下Mysql以使得binlog生效。

    1systemctl start mysqld.service

    5.3 配置connector连接器。

    配置confluent路径目录 : /etc

    创建文件夹命令 :

    1mkdir kafka-connect-debezium

    在mysql2kafka_debezium.json存放connector的配置信息 :

    1[root@localhost kafka-connect-debezium]# cat mysql2kafka_debezium.json
    2{ 
    3        "name" : "debezium-mysql-source-0223",
    4        "config":
    5        {
    6             "connector.class" : "io.debezium.connector.mysql.MySqlConnector",
    7             "database.hostname" : "192.168.1.22",
    8             "database.port" : "3306",
    9             "database.user" : "root",
    10             "database.password" : "XXXXXX",
    11             "database.whitelist" : "kafka_base_db",
    12             "table.whitlelist" : "accounts",
    13             "database.server.id" : "223344",
    14             "database.server.name" : "full",
    15             "database.history.kafka.bootstrap.servers" : "192.168.1.22:9092",
    16             "database.history.kafka.topic" : "account_topic",
    17             "include.schema.changes" : "true" ,
    18             "incrementing.column.name" : "id",
    19             "database.history.skip.unparseable.ddl" : "true",
    20             "transforms": "unwrap,changetopic",
    21             "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    22             "transforms.changetopic.type":"org.apache.kafka.connect.transforms.RegexRouter",
    23             "transforms.changetopic.regex":"(.*)",
    24             "transforms.changetopic.replacement":"$1-smt"
    25        }
    26}

    注意如下配置:

    1. "database.server.id",对应Mysql中的server-id的配置。

    2. "database.whitelist" : 待同步的Mysql数据库名。

    3. "table.whitlelist" :待同步的Mysq表名。

    4. 重要:“database.history.kafka.topic”:存储数据库的Shcema的记录信息,而非写入数据的topic、

    5. "database.server.name":逻辑名称,每个connector确保唯一,作为写入数据的kafka topic的前缀名称。

    坑一:transforms相关5行配置作用是写入数据格式转换。

    如果没有,输入数据会包含:before、after记录修改前对比信息以及元数据信息(source,op,ts_ms等)。

    这些信息在后续数据写入Elasticsearch是不需要的。(注意结合自己业务场景)。

    格式转换相关原理:http://t.cn/EftoaIi

    5.4 启动connector

    1curl -X POST -H "Content-Type:application/json" 
    2--data @mysql2kafka_debezium.json.json 
    3http://192.168.1.22:18083/connectors | jq

    5.5 验证写入是否成功。

    5.5.1  查看kafka-topic

    1    kafka-topics --list --zookeeper localhost:2181

    此处会看到写入数据topic的信息。

    注意新写入数据topic的格式:database.schema.table-smt 三部分组成。

    本示例topic名称:

    full.kafka_base_db.account-smt

    5.5.2 消费数据验证写入是否正常

    1./kafka-avro-console-consumer --topic full.kafka_base_db.account-smt --bootstrap-server 192.168.1.22:9092 --from-beginning

    至此,Debezium实现mysql同步kafka完成。

    6、kafka-connector实现kafka同步Elasticsearch

    6.1、Kafka-connector介绍

    见官网:https://docs.confluent.io/current/connect.html

    Kafka Connect是一个用于连接Kafka与外部系统(如数据库,键值存储,检索系统索引和文件系统)的框架。 

    连接器实现公共数据源数据(如Mysql、Mongo、Pgsql等)写入Kafka,或者Kafka数据写入目标数据库,也可以自己开发连接器。

    6.2、kafka到ES connector同步配置

    配置路径:

    1/home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

    配置内容:

    1"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    2"tasks.max": "1",
    3"topics": "full.kafka_base_db.account-smt",
    4"key.ignore": "true",
    5"connection.url": "http://192.168.1.22:9200",
    6"type.name": "_doc",
    7"name": "elasticsearch-sink-test"

    6.3 kafka到ES启动connector

    启动命令

    1confluent load  elasticsearch-sink-test 
    2-d /home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

    6.4 Kafka-connctor RESTFul API查看

    Mysql2kafka,kafka2ES的connector详情信息可以借助postman或者浏览器或者命令行查看。

    1curl -X GET http://localhost:8083/connectors

    7、坑复盘。

    坑2: 同步的过程中可能出现错误,比如:kafka topic没法消费到数据。
    排解思路如下:

    • 1)确认消费的topic是否是写入数据的topic;

    • 2)确认同步的过程中没有出错。可以借助connector如下命令查看。

    1curl -X GET http://localhost:8083/connectors-xxx/status

    坑3: Mysql2ES出现日期格式不能识别。

    是Mysql jar包的问题,解决方案:在my.cnf中配置时区信息即可。

    坑4: kafka2ES,ES没有写入数据。

    排解思路:

    • 1)建议:先创建同topic名称一致的索引,注意:Mapping静态自定义,不要动态识别生成。

    • 2)通过connetor/status排查出错原因,一步步分析。

    8、小结

      1. binlog的实现突破了字段的限制,实际上业界的go-mysql-elasticsearch已经实现。

      2. 对比:logstash、kafka-connector,虽然Debezium“曲线救国”两步实现了实时同步,但稳定性+实时性能相对不错。

      3. 推荐大家使用。大家有好的同步方式也欢迎留言讨论

      4. 转载自https://mp.weixin.qq.com/s/XTvWpTq2YsFBzT2gojNoHA
    螃蟹在剥我的壳,笔记本在写我,漫天的我落在枫叶上雪花上,而你在想我。 --章怀柔
  • 相关阅读:
    (转)用Ajax技术让IE Web Control Tree View实现大数据量读取
    您试图从目录中执行CGI、ISAPI 或其他可执行程序,但该目录不允许执行程序
    oracle数据库中ORA28000: the account is locked问题
    C#动态生成html页面
    oracle 用户权限解释
    HCPC2013校赛训练赛 2
    ZOJ2770 Burn the Linked Camp 差分约束
    POJ2570 Fiber Network 状态压缩+floyd
    ZOJ3088 Easter Holidays 最短路
    POJ1364 King 差分约束
  • 原文地址:https://www.cnblogs.com/lovezhr/p/15054672.html
Copyright © 2011-2022 走看看