zoukankan      html  css  js  c++  java
  • canal+kafka订阅Mysql binlog将数据异构到elasticsearch(或其他存储方式)

    canal本质就是"冒充"从库,通过订阅mysql bin-log来获取数据库的更改信息。

    mysql配置(my.cnf)

    mysql需要配置my.cnf开启bin-log日志并且将bin-log日志格式设置为row, 同时为了防止bin-log日志占用过多磁盘,可以设置一下过期时间,

    [mysqld]
    log-bin=mysql-bin # 打开binlog
    binlog-format=ROW # ROW格式
    server_id=1 # mysql Replication 需要设置 在mysql集群里唯一
    
    expire_logs_days=7 # binlog文件保存7天
    max_binlog_size = 500m # 每个binlog日志文件大小 
    

    canal配置

    除了kafka之外,canal还支持将数据库修改的消息投递到rocketMQ, 或者不经过消息队列直接投递到canal的客户端,然后再在客户端实现自己的代码(如写入其他存储/其他消息队列) ,但是只能选其一。而如果选择canal客户端的方式, 一个canal server也只能将消息投递到一个canal client。

    但是可以开启多个canal服务端和客户端(同一个实例,即对mysql来说只算是一个从库), 他们通过zookeeper保证只有一个服务端和客户端是有效的,其他只是作为HA的冗余。

    然后需要修改canal目录下(以下为近最小配置)

    conf/example/instance.properties

    ## mysql serverId
    canal.instance.mysql.slaveId = 1234
    
    # 数据库address
    canal.instance.master.address = 127.0.0.1:3306
    
    # 数据库账号密码
    canal.instance.dbUsername = canal  
    canal.instance.dbPassword = canal
    
    # 需要订阅的数据库.表名 默认全部
    canal.instance.filter.regex = .*\\..*  # 去掉转义符其实就是 .*..*
    
    # topic名 固定
    canal.mq.topic=canal
    
    # 动态topic
    # canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\..*,.*\..*
    
    # 库名.表名: 唯一主键,多个表之间用逗号分隔
    # canal.mq.partitionHash=mytest.person:id,mytest.role:id
    

    其中动态topic 和 主键hash看上去有点难理解,去看其他人的博客找到的解释和例子如下

    动态topic

    canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多个配置之间使用逗号分隔

    例子1:test.test 指定匹配的单表,发送到以 test_test为名字的topic上
    例子2:.…* 匹配所有表,每个表都会发送到各自表名的topic上
    例子3:test 指定匹配对应的库,一个库的所有表都会发送到库名的topic上
    例子4:test.* 指定匹配的表达式,针对匹配的表会发送到各自表名的topic上
    例子5:test,test1.test1,指定多个表达式,会将test库的表都发送到test的topic上,test1.test1的表发送到对应的test1_test1 topic上,其余的表发送到默认的canal.mq.topic值
    支持指定topic名称匹配, 配置格式:topicName:schema 或 schema.table,多个配置之间使用逗号分隔, 多组之间使用 ; 分隔

    例子:test:test,test1.test1;test2:test2,test3.test1 针对匹配的表会发送到指定的topic上
    ————————————————
    版权声明:本文为CSDN博主「BillowX_」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/weixin_35852328/article/details/87600871

    主键

    canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多个配置之间使用逗号分隔

    例子1:test.test:pk1^pk2 指定匹配的单表,对应的hash字段为pk1 + pk2
    例子2:.…:id 正则匹配,指定所有正则匹配的表对应的hash字段为id
    例子3:.…
    :pkpkpk 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(自动查找)
    例子4: 匹配规则啥都不写,则默认发到0这个partition上
    例子5:.…* ,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名
    按表hash: 一张表的所有数据可以发到同一个分区,不同表之间会做散列 (会有热点表分区过大问题)
    例子6: test.test:id,.…* , 针对test的表按照id散列,其余的表按照table散列
    注意:大家可以结合自己的业务需求,设置匹配规则,多条匹配规则之间是按照顺序进行匹配(命中一条规则就返回)
    ————————————————
    版权声明:本文为CSDN博主「BillowX_」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/weixin_35852328/article/details/87600871

    最后实现消费kafka上canal topic上消息的代码

    这里以go为例,可以写入到elasticsearch/redis/其他

    package main
    
    import (
    	"bytes"
    	"context"
    	"encoding/json"
    	"fmt"
    	"github.com/Shopify/sarama"
    	"github.com/elastic/go-elasticsearch/esapi"
    	"github.com/elastic/go-elasticsearch/v6"
    	"os"
    )
    
    var esClient *elasticsearch.Client
    
    func init() {
    	var err error
    	config := elasticsearch.Config{}
    	config.Addresses = []string{"http://127.0.0.1:9200"}
    	esClient, err = elasticsearch.NewClient(config)
    	checkErr(err)
    }
    
    type Msg struct {
    	Data []struct {
    		Id string `json:"id"`
    		A  string `json:"a"`
    	} `json:"data"`
    	Type     string `json:"type"`
    	DataBase string `json:"database"`
    	Table    string `json:"table"`
    }
    
    func checkErr(err error) {
    	if err != nil {
    		fmt.Println(err)
    		os.Exit(-1)
    	}
    }
    
    type Consumer struct{}
    
    func (c *Consumer) Setup(sarama.ConsumerGroupSession) error {
    	return nil
    }
    
    func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
    	return nil
    }
    
    func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    	for message := range claim.Messages() {
        // fmt.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
    		msg := &Msg{}
    		err := json.Unmarshal(message.Value, msg)
    		checkErr(err)
    		if msg.DataBase == "test" && msg.Table == "tbltest" {
    			if msg.Type == "INSERT" {
    				for k, _ := range msg.Data {
    					// 写elasticsearch 逻辑
    					body := map[string]interface{}{
    						"id": msg.Data[k].Id,
    						"a":  msg.Data[k].A,
    					}
    					jsonBody, _ := json.Marshal(body)
    					req := esapi.IndexRequest{
    						Index:      msg.DataBase,
    						DocumentID: msg.Table + "_" + msg.Data[k].Id,
    						Body:       bytes.NewReader(jsonBody),
    					}
    					res, err := req.Do(context.Background(), esClient)
    					checkErr(err)
    					fmt.Println(res.String())
    					res.Body.Close()
    					session.MarkMessage(message, "")
    				}
    			}
    		}
    	}
    	return nil
    }
    
    func main() {
    	consumer := &Consumer{}
    
    	config := sarama.NewConfig()
    	config.Version = sarama.MaxVersion
    	client, err := sarama.NewConsumerGroup([]string{"127.0.0.1:9092"}, "tg", config)
    	checkErr(err)
    	ctx := context.Background()
    
    	client.Consume(ctx, []string{"canal"}, consumer)
    }
    
  • 相关阅读:
    公安备案接入服务商如何填写?(网站接入信息)
    VSCode 开发Vue必备插件
    阿里云ecs从零配置centos 安装宝塔bt环境 (安装失败提示setuptools installation failed)
    hover时下划线从中间向两端渐变
    phpcms v9后台增加阅读量字段,可任意修改阅读量
    织梦登录后台变空白解决方法大全
    html鼠标滚动后导航栏吸顶效果
    关于height:100%和height:100vh的区别
    mycat
    Hash碰撞
  • 原文地址:https://www.cnblogs.com/Me1onRind/p/11565501.html
Copyright © 2011-2022 走看看