zoukankan      html  css  js  c++  java
  • canal数据同步 客户端代码实现

    1.引入相关依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
     
        <!--mysql-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
     
        <dependency>
            <groupId>commons-dbutils</groupId>
            <artifactId>commons-dbutils</artifactId>
        </dependency>
     
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
     
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
        </dependency>
    </dependencies>
    

    2.创建配置文件

    # 服务端口
    server.port=10000
    # 服务名
    spring.application.name=canal-client
     
    # 环境设置:dev、test、prod
    spring.profiles.active=dev
     
    # mysql数据库连接
    spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
    spring.datasource.url=jdbc:mysql://localhost:3306/guli?serverTimezone=GMT%2B8&characterEncoding=utf-8
    spring.datasource.username=root
    spring.datasource.password=root
    

    3.编写客户端类

    开启canal的远程监听,当size大于0时,说明数据有变化,那么执行数据处理,有三种类型的数据处理,包括删除,修改,新增,每种数据处理都对应这特有的方法,而三种方法思路相同,都是将改变的数据拼接成对应的sql语句,然后放入队列中,当队列的大小大于0时就执行对应的方法,方法就是从队列中取出sql语句,然后把语句做执行,最终会在本地库中同步改变的数据。

    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry.*;
    import com.alibaba.otter.canal.protocol.Message;
    import com.google.protobuf.InvalidProtocolBufferException;
    import org.apache.commons.dbutils.DbUtils;
    import org.apache.commons.dbutils.QueryRunner;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    import javax.sql.DataSource;
    import java.net.InetSocketAddress;
    import java.sql.Connection;
    import java.sql.SQLException;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Queue;
    import java.util.concurrent.ConcurrentLinkedQueue;
    
    @Component
    public class CanalClient {
    
        //sql队列
        private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
    
        @Resource
        private DataSource dataSource;
    
        /**
         * canal入库方法
         */
        public void run() {
    
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.235.130",
                    11111), "example", "", "");
            int batchSize = 1000;
            try {
                connector.connect();
                connector.subscribe("guli.members");
                connector.rollback();
                try {
                    while (true) {
                        //尝试从master那边拉去数据batchSize条记录,有多少取多少
                        Message message = connector.getWithoutAck(batchSize);
                        long batchId = message.getId();
                        int size = message.getEntries().size();
                        if (batchId == -1 || size == 0) {
                            Thread.sleep(1000);
                        } else {
                            dataHandle(message.getEntries());
                        }
                        connector.ack(batchId);
    
                        //当队列里面堆积的sql大于一定数值的时候就模拟执行
                        if (SQL_QUEUE.size() >= 1) {
                            executeQueueSql();
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (InvalidProtocolBufferException e) {
                    e.printStackTrace();
                }
            } finally {
                connector.disconnect();
            }
        }
    
        /**
         * 模拟执行队列里面的sql语句
         */
        public void executeQueueSql() {
            int size = SQL_QUEUE.size();
            for (int i = 0; i < size; i++) {
                String sql = SQL_QUEUE.poll();
                System.out.println("[sql]----> " + sql);
    
                this.execute(sql.toString());
            }
        }
    
        /**
         * 数据处理
         *
         * @param entrys
         */
        private void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException {
            for (Entry entry : entrys) {
                if (EntryType.ROWDATA == entry.getEntryType()) {
                    RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                    EventType eventType = rowChange.getEventType();
                    if (eventType == EventType.DELETE) {
                        saveDeleteSql(entry);
                    } else if (eventType == EventType.UPDATE) {
                        saveUpdateSql(entry);
                    } else if (eventType == EventType.INSERT) {
                        saveInsertSql(entry);
                    }
                }
            }
        }
    
        /**
         * 保存更新语句
         *
         * @param entry
         */
        private void saveUpdateSql(Entry entry) {
            try {
                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                List<RowData> rowDatasList = rowChange.getRowDatasList();
                for (RowData rowData : rowDatasList) {
                    List<Column> newColumnList = rowData.getAfterColumnsList();
                    StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");
                    for (int i = 0; i < newColumnList.size(); i++) {
                        sql.append(" " + newColumnList.get(i).getName()
                                + " = '" + newColumnList.get(i).getValue() + "'");
                        if (i != newColumnList.size() - 1) {
                            sql.append(",");
                        }
                    }
                    sql.append(" where ");
                    List<Column> oldColumnList = rowData.getBeforeColumnsList();
                    for (Column column : oldColumnList) {
                        if (column.getIsKey()) {
                            //暂时只支持单一主键
                            sql.append(column.getName() + "=" + column.getValue());
                            break;
                        }
                    }
                    SQL_QUEUE.add(sql.toString());
                }
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 保存删除语句
         *
         * @param entry
         */
        private void saveDeleteSql(Entry entry) {
            try {
                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                List<RowData> rowDatasList = rowChange.getRowDatasList();
                for (RowData rowData : rowDatasList) {
                    List<Column> columnList = rowData.getBeforeColumnsList();
                    StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");
                    for (Column column : columnList) {
                        if (column.getIsKey()) {
                            //暂时只支持单一主键
                            sql.append(column.getName() + "=" + column.getValue());
                            break;
                        }
                    }
                    SQL_QUEUE.add(sql.toString());
                }
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 保存插入语句
         *
         * @param entry
         */
        private void saveInsertSql(Entry entry) {
            try {
                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                List<RowData> rowDatasList = rowChange.getRowDatasList();
                for (RowData rowData : rowDatasList) {
                    List<Column> columnList = rowData.getAfterColumnsList();
                    StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");
                    for (int i = 0; i < columnList.size(); i++) {
                        sql.append(columnList.get(i).getName());
                        if (i != columnList.size() - 1) {
                            sql.append(",");
                        }
                    }
                    sql.append(") VALUES (");
                    for (int i = 0; i < columnList.size(); i++) {
                        sql.append("'" + columnList.get(i).getValue() + "'");
                        if (i != columnList.size() - 1) {
                            sql.append(",");
                        }
                    }
                    sql.append(")");
                    SQL_QUEUE.add(sql.toString());
                }
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 入库
         * @param sql
         */
        public void execute(String sql) {
            Connection con = null;
            try {
                if(null == sql) return;
                con = dataSource.getConnection();
                QueryRunner qr = new QueryRunner();
                int row = qr.execute(con, sql);
                System.out.println("update: "+ row);
            } catch (SQLException e) {
                e.printStackTrace();
            } finally {
                DbUtils.closeQuietly(con);
            }
        }
    }
    

    4.修改启动类

    实现一个接口,使得启动类启动时去调用canal客户端进行监听。

    import com.renzhe.canal.client.CanalClient;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class CanalApplication implements CommandLineRunner {
        @Autowired
        private CanalClient canalClient;
        public static void main(String[] args) {
            SpringApplication.run(CanalApplication.class,args);
        }
    
        @Override
        public void run(String... args) throws Exception {
            //项目启动,执行canal客户端监听
            canalClient.run();
        }
    }

    学习自尚硅谷

  • 相关阅读:
    leetcode 350. Intersection of Two Arrays II
    leetcode 278. First Bad Version
    leetcode 34. Find First and Last Position of Element in Sorted Array
    leetcode 54. Spiral Matrix
    leetcode 59. Spiral Matrix II
    leetcode 44. Wildcard Matching
    leetcode 10. Regular Expression Matching(正则表达式匹配)
    leetcode 174. Dungeon Game (地下城游戏)
    leetcode 36. Valid Sudoku
    Angular Elements
  • 原文地址:https://www.cnblogs.com/jamers-rz/p/14396978.html
Copyright © 2011-2022 走看看