zoukankan      html  css  js  c++  java
  • 【Canal源码分析】数据传输协议

    Canal的数据传输有两块,一块是进行binlog订阅时,binlog转换为我们所定义的Message,第二块是client与server进行TCP交互时,传输的TCP协议。

    一、EntryProtocal

    这块是binlog的一个存储。主要的格式如下:

    Entry
        Header
            version         [协议的版本号,default = 1]
            logfileName     [binlog文件名]
            logfileOffset   [binlog position]
            serverId        [服务端serverId]
            serverenCode    [变更数据的编码]
            executeTime     [变更数据的执行时间]
            sourceType      [变更数据的来源,default = MYSQL]
            schemaName      [变更数据的schemaname]
            tableName       [变更数据的tablename]
            eventLength     [每个event的长度]
            eventType       [insert/update/delete类型,default = UPDATE]
            props           [预留扩展]
            gtid            [当前事务的gitd]
        entryType           [事务头BEGIN/事务尾END/数据ROWDATA/HEARTBEAT/GTIDLOG]
        storeValue          [byte数据,可展开,对应的类型为RowChange]    
    RowChange
        tableId             [tableId,由数据库产生]
        eventType           [数据变更类型,default = UPDATE]
        isDdl               [标识是否是ddl语句,比如create table/drop table]
        sql                 [ddl/query的sql语句]
        rowDatas            [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
            beforeColumns   [字段信息,增量数据(修改前,删除前),Column类型的数组]
            afterColumns    [字段信息,增量数据(修改后,新增后),Column类型的数组] 
            props           [预留扩展]
        props               [预留扩展]
        ddlSchemaName       [ddl/query的schemaName,会存在跨库ddl,需要保留执行ddl的当前schemaName]
    Column 
        index               [字段下标]      
        sqlType             [jdbc type]
        name                [字段名称(忽略大小写),在mysql中是没有的]
        isKey               [是否为主键]
        updated             [是否发生过变更]
        isNull              [值是否为null]
        props               [预留扩展]
        value               [字段值,timestamp,Datetime是一个时间格式的文本]
        length              [对应数据对象原始长度]
        mysqlType           [字段mysql类型]
    

    二、CanalProtocal

    这块主要定义了client和server交互的协议。

    Packet
        magic_number    [default = 17]
        version         [default = 1]
        type            [PacketType,类型]
        compression     [压缩,default = NONE]
        body            [具体内容]
    

    主要的类型和对应的body,都可以在CanalProtocal.proto里面查看到。

    enum PacketType {
        HANDSHAKE = 1;
        CLIENTAUTHENTICATION = 2;
        ACK = 3;
        SUBSCRIPTION = 4;
        UNSUBSCRIPTION = 5;
        GET = 6;
        MESSAGES = 7;
        CLIENTACK = 8;
        // management part
        SHUTDOWN = 9;
        // integration
        DUMP = 10;
        HEARTBEAT = 11;
        CLIENTROLLBACK = 12;
    }
    
    //心跳
    message HeartBeat {
        optional int64 send_timestamp = 1;
        optional int64 start_timestamp = 2;
    }
    
    //握手
    message Handshake {
        optional string communication_encoding = 1 [default = "utf8"];
        optional bytes seeds = 2;
        repeated Compression supported_compressions = 3;
    }
    
    // client authentication
    message ClientAuth {
        optional string username = 1;
        optional bytes password = 2; // hashed password with seeds from Handshake message
        optional int32 net_read_timeout = 3 [default = 0]; // in seconds
        optional int32 net_write_timeout = 4 [default = 0]; // in seconds
        optional string destination = 5;
        optional string client_id = 6;
        optional string filter = 7;
        optional int64 start_timestamp = 8;
    }
    
    //服务端响应
    message Ack {
        optional int32 error_code = 1 [default = 0];
        optional string error_message = 2; // if something like compression is not supported, erorr_message will tell about it.
    }
    
    //客户端提交
    message ClientAck {
        optional string destination = 1;
        optional string client_id = 2;
        optional int64 batch_id = 3;
    }
    
    // subscription
    message Sub {
        optional string destination = 1;
        optional string client_id = 2;
        optional string filter = 7;
    }
    
    // Unsubscription
    message Unsub {
        optional string destination = 1;
        optional string client_id = 2;
        optional string filter = 7;
    }
    
    //  PullRequest
    message Get {
        optional string destination = 1;
        optional string client_id = 2;
        optional int32 fetch_size = 3;
        optional int64 timeout = 4 [default = -1]; // 默认-1时代表不控制
        optional int32 unit = 5 [default = 2];// 数字类型,0:纳秒,1:毫秒,2:微秒,3:秒,4:分钟,5:小时,6:天
        optional bool auto_ack = 6 [default = false]; // 是否自动ack
    }
    
    //消息
    message Messages {
    	optional int64 batch_id = 1;
        repeated bytes messages = 2;
    }
    
    // TBD when new packets are required
    message Dump{
        optional string journal = 1;
        optional int64  position = 2;
        optional int64 timestamp = 3 [default = 0];
    }
    
    // 客户端回滚
    message ClientRollback{
        optional string destination = 1;
        optional string client_id = 2;
        optional int64 batch_id = 3;
    }
    
  • 相关阅读:
    gitolite 丢失管理密钥/访问权限 解决办法
    4/20
    socket套接字模块
    网络编程part2
    网络编程part1
    异常处理
    类的属性查找
    多继承带来的菱形问题
    property装饰器
    类的继承派生
  • 原文地址:https://www.cnblogs.com/f-zhao/p/9110575.html
Copyright © 2011-2022 走看看