zoukankan      html  css  js  c++  java
  • Canal 作为 消息中间件 实时消费MySQL中新增数据

    Canal的数据结构

    网上对Canal的介绍已经够多了,这里不再赘述,但是有一点必须要强调,就是Canal 是怎么对数据进行的封装,只有明白了这点,才可以去消费其中的数据

     Canal的安装及配置

    配置Mysql主服务器的my.cnf文件(位于/etc目录下,没有就新建)

    #主服务器的id
    server-id=1
    #启用二进制日志
    log-bin=mysql-bin
    #设置不复制的数据库(选配)
    binlog-ignore-db=mysql
    #设置要复制的数据库(选配)
    binlog-do-db=需要复制的主数据库名字(设置一个之前没有的数据库)
    #设置logbin的格式
    binlog_format=row

    logbin格式有三种

      statement   存储的是涉及到数据变化的sql语句,文件比较小,但是如果sql语句中有些特殊语句(比如随即值),就会导致数据不一致

      row   记录的是数据改变后的数据,能保证数据严格一致,但是会使文件比较大

      mixed  会动态调整使用statement和row 

    这里应用场景是对数据进行监控,所以使用row

    重启Mysql服务

    service mysql restart

    进入Mysql,检查binlog是否生效

    mysql> show variables like 'log_%';

     配置conf/canal.properties

    基本所有配置都可以保持默认,但需要注意默认端口是11111,后续连接会用上

     配置实例配置conf/example/instance.properties

    #//这个id不能跟mysql中配置的id相同!!!
    canal.instance.mysql.slaveId=100
    //mysql地址
    canal.instance.master.address=hadoop102:3306
    #连接数据库所需要的用户名和密码
    canal.instance.dbUsername=root
    canal.instance.dbPassword=123
    canal.instance.connectionCharset = UTF-8
    canal.instance.defaultDatabaseName =
    # table regex
    //要监听的数据库,用正则表达式,这里表示gmall数据库中的所有表
    canal.instance.filter.regex=gmall\..*
    # table black regex
    //黑名单
    canal.instance.filter.black.regex=

    启动服务

    bin/startup.sh

    bin/stop.sh

    从Canal中消费数据到kafka

    添加依赖

      <dependencies>
            <!--canal 客户端, 从 canal 服务器读取数据-->
            <dependency>
                <groupId>com.alibaba.otter</groupId>
                <artifactId>canal.client</artifactId>
                <version>1.1.2</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
            <!-- kafka 客户端 -->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.11.0.0</version>
            </dependency>
        </dependencies>

    代码实现

    import java.net.InetSocketAddress
    import java.util
    
    import com.alibaba.fastjson.JSONObject
    import com.alibaba.otter.canal.client.{CanalConnector, CanalConnectors}
    import com.alibaba.otter.canal.protocol.CanalEntry.{EntryType, EventType, RowChange}
    import com.alibaba.otter.canal.protocol.{CanalEntry, Message}
    import com.atguigu.gmall.common.Constant
    import com.google.protobuf.ByteString
    
    /**
     * Author atguigu
     * Date 2020/5/30 15:29
     */
    object CanalClient {
        // 真正的处理数据
        def parseData(rowDataList: util.List[CanalEntry.RowData],
                      tableName: String,
                      eventType: CanalEntry.EventType): Unit = {
            // 计算订单总额 ,每在order_info表中插入一条数据就发送给kafka
            if(tableName == "order_info" && eventType == EventType.INSERT && rowDataList != null && rowDataList.size() > 0){
                    import scala.collection.JavaConversions._
                    for(rowData <- rowDataList){
                        val result: JSONObject = new JSONObject()
                        // 一个rowData表示一行数据, 所有列组成一个json对象, 写入到Kafka中
                        val columnList: util.List[CanalEntry.Column] = rowData.getAfterColumnsList
                        for(column <- columnList){ // column 列
                            val key: String = column.getName  // 列名
                            val value: String = column.getValue   // 列值
                            result.put(key, value)
                        }
                        // 把数据写入到kafka中. 用一个生产者
                        MykafkaUtil.send(Constant.ORDER_INFO_TOPIC, result.toJSONString)
                    }
            }
            
        }
        
        def main(args: Array[String]): Unit = {
            // 1. 连接到canal服务器
            // 1.1 canal服务器的地址  canal服务器的端口号
            val address: InetSocketAddress = new InetSocketAddress("hadoop102", 11111)
            val connector: CanalConnector = {
              CanalConnectors.newSingleConnector(address,  destination="example", username="", password="")
            }
            // 1.2 连接到canal
            connector.connect()
            // 2. 订阅你要处理的具体表 gmall1128下所有的表
            connector.subscribe("gmall.*")
            
            // 3. 读取数据, 解析
            while (true) {
                // 一致监听mysql数据变化, 所以这个地方不挺
                // 100表示最多一次拉取由于100条sql导致的数据的变化
                val msg: Message = connector.get(100)
                val entries: util.List[CanalEntry.Entry] = msg.getEntries
                if (entries != null && entries.size() > 0) {
                    // 遍历拿到每个entry
                    import scala.collection.JavaConversions._
                    for (entry <- entries) {
                        // 处理的EntryType应该时刻RowData
                        if (entry != null && entry.hasEntryType && entry.getEntryType == EntryType.ROWDATA) {
                            // 获取storeValue. 每个entry一个
                            val storeValue: ByteString = entry.getStoreValue
                            // 每个storeVales一个RowChange
                            val rowChange: RowChange = RowChange.parseFrom(storeValue)
                            // 每个rowChange中多个RowData. 一个RowData就表示一行数据
                            val rowDataList: util.List[CanalEntry.RowData] = rowChange.getRowDatasList
                           //调用处理数据的方法,在这里对每行的数据进行真正的处理
                            parseData(rowDataList, entry.getHeader.getTableName, rowChange.getEventType)
                        }
                    }
                } else {
                    println("没有拉倒数据, 2s之后继续拉....")
                    Thread.sleep(2000)
                }
            }
        }
    }
  • 相关阅读:
    数据库字段太多,批量快速建立实体类方法(适合大量字段建立实体类)
    SQL service 中的 ”输入SQL命令窗口“ 打开了 “属性界面” 回到 ”输入SQL命令窗口“
    计算机软件编程英语词汇集锦
    编程常用英语词汇
    svn上传和下载项目
    当启动tomcat时出现tomcat setting should be set in tomcat preference page
    Implicit super constructor Object() is undefined for default constructor. Must define an explicit constructor
    eclipse中选中一个单词 其他相同的也被选中 怎么设置
    Spring Boot的@SpringBootApplication无法引入的问题
    最全的SpringCloud视频教程
  • 原文地址:https://www.cnblogs.com/yangxusun9/p/13019484.html
Copyright © 2011-2022 走看看