zoukankan      html  css  js  c++  java
  • 在kafka connect 同步 mysql 主从数据库

    下载以下文件,解压,放置到kafka的libs目录

    kafka-connect-jdbc-4.1.1

    从这里选择适合的mysql connector

    mysql-connector-java-8.0.16.jar

    将里面的jar文件提取出来,也放到kafka的libs目录

    在config目录下创建 connect-mysql-source.properties

    创建 A数据库源表person

    CREATE TABLE `person` (
      `pid` int(11) NOT NULL AUTO_INCREMENT,
      `firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
      `age` int(11) DEFAULT NULL,
      PRIMARY KEY (`pid`)
    ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
    

    创建 B数据库目标表kafkaperson

    CREATE TABLE `kafkaperson` (
      `pid` int(11) NOT NULL AUTO_INCREMENT,
      `firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
      `age` int(11) DEFAULT NULL,
      PRIMARY KEY (`pid`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
    

    connect-mysql-source.properties 内容为

    name=mysql-a-source-person
    connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
    tasks.max=1
    connection.url=jdbc:mysql://127.0.0.1:3306/a_db?user=root&password=root
    # incrementing  自增
    mode=incrementing
    # 自增字段  pid
    incrementing.column.name=pid
    # 白名单表  person
    table.whitelist=person
    # topic前缀   mysql-kafka-
    topic.prefix=mysql-kafka-

    connect-mysql-sink.properties 内容

    name=mysql-a-sink-person
    connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
    tasks.max=1
    #kafka的topic名称
    topics=mysql-kafka-person
    # 配置JDBC链接
    connection.url=jdbc:mysql://127.0.0.1:3306/b_db?user=root&password=root
    # 不自动创建表,如果为true,会自动创建表,表名为topic名称
    auto.create=false
    # upsert model更新和插入
    insert.mode=upsert
    # 下面两个参数配置了以pid为主键更新
    pk.mode = record_value
    pk.fields = pid
    #表名为kafkatable
    table.name.format=kafkaperson
    

    启动kafka

    参考 kafka安装

    如果报 The server time zone value ” is  unrecognized or represents more than one time 。。。

    以命令行进入mysql

    mysql> show variables like '%time_zone%';
    +------------------+--------+
    | Variable_name    | Value  |
    +------------------+--------+
    | system_time_zone |        |
    | time_zone        | SYSTEM |
    +------------------+--------+
    2 rows in set

    如果输出结果是system

    设置time_zone即可

    mysql> set global time_zone='+8:00';
    Query OK, 0 rows affected;




    稍微有点延迟、并且只有添加才会同步,更新、删除都不行。
  • 相关阅读:
    小米路由研究之中的一个加入菜单
    【Struts2学习笔记(9)】单文件上传和多文件上传
    isPostback 的原理及作用(很easy)
    1-2Html与CSS的关系
    【HTML5】实现QQ聊天气泡效果
    杭电1166敌兵布阵 (用的树状数组)
    安卓市场---框架搭建4
    qcow2 raw vhd 虚拟磁盘转换
    softlayer virtual machine vhd磁盘镜像导入shell脚本
    Openstack no valid hot
  • 原文地址:https://www.cnblogs.com/zgzf/p/10883010.html
Copyright © 2011-2022 走看看