下载以下文件,解压,放置到kafka的libs目录
从这里选择适合的mysql connector
mysql-connector-java-8.0.16.jar
将里面的jar文件提取出来,也放到kafka的libs目录
在config目录下创建 connect-mysql-source.properties
创建 A数据库源表person
CREATE TABLE `person` ( `pid` int(11) NOT NULL AUTO_INCREMENT, `firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL, `age` int(11) DEFAULT NULL, PRIMARY KEY (`pid`) ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
创建 B数据库目标表kafkaperson
CREATE TABLE `kafkaperson` ( `pid` int(11) NOT NULL AUTO_INCREMENT, `firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL, `age` int(11) DEFAULT NULL, PRIMARY KEY (`pid`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
connect-mysql-source.properties 内容为
name=mysql-a-source-person connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 connection.url=jdbc:mysql://127.0.0.1:3306/a_db?user=root&password=root # incrementing 自增 mode=incrementing # 自增字段 pid incrementing.column.name=pid # 白名单表 person table.whitelist=person # topic前缀 mysql-kafka- topic.prefix=mysql-kafka-
connect-mysql-sink.properties 内容
name=mysql-a-sink-person connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=1 #kafka的topic名称 topics=mysql-kafka-person # 配置JDBC链接 connection.url=jdbc:mysql://127.0.0.1:3306/b_db?user=root&password=root # 不自动创建表,如果为true,会自动创建表,表名为topic名称 auto.create=false # upsert model更新和插入 insert.mode=upsert # 下面两个参数配置了以pid为主键更新 pk.mode = record_value pk.fields = pid #表名为kafkatable table.name.format=kafkaperson
启动kafka
参考 kafka安装
如果报 The server time zone value ” is unrecognized or represents more than one time 。。。
以命令行进入mysql
mysql> show variables like '%time_zone%';
+------------------+--------+
| Variable_name | Value |
+------------------+--------+
| system_time_zone | |
| time_zone | SYSTEM |
+------------------+--------+
2 rows in set
如果输出结果是system
设置time_zone
即可
mysql> set global time_zone='+8:00';
Query OK, 0 rows affected;
稍微有点延迟、并且只有添加才会同步,更新、删除都不行。