zoukankan      html  css  js  c++  java
  • MySQL ==> Maxwell ==> Kafka ==> Spark

    mysql中如何开启binlog?开启二进制日志文件?binary log?
    参照:
    https://www.cnblogs.com/chuanzhang053/p/9335924.html

    MySQL
    –> 中间件 maxwell 【json】 /canal
    –> Kafka
    –> ?
    –>存储Hbase/kudu/Cassandra

    json --> DF
    
    初始化的全量数据  怎么刷?bootstrap引导
    
    这是Maxwell的守护进程,这是一个读取MySQL binlog并将行更新作为JSON
    写入Kafka,Kinesis或其他流媒体平台的应用程序。
    ================================
    前置:MySQL 相关配置
    
    1.修改binlog_format=row
    $ vi my.cnf
    
    [mysqld]
    server_id=1
    log-bin=master
    binlog_format=row
    ----------------
    service mysqld restart
    
    show variables like '%binlog%';
    
    2.创建Maxwell 的db 和用户
    mysql> create database maxwell;
    mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'P@ssw0rd';
    mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
    mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
    mysql> flush privileges;
    
    
    正式开始:
    下载maxwell http://maxwells-daemon.io
    解压
    
    
    Maxwell : MySQL + Kakfa 
    
    source : Source 就是数据库数据的变化
    sink : sink 到控制台 或者Kafka
    
    1 指定sink为Command line:==============	
    
    bin/maxwell 
    --user='maxwell' 
    --password='P@ssw0rd' 
    --host='127.0.0.1' 
    --producer=stdout
    
    insert into traffic_m5(m5,domain,traffic) values('201909091111','www.csylh.cn',778978);
    update traffic_m5 set traffic=9999 where m5='201909091111' and domain='www.csylh.cn';
    delete from traffic_m5 where m5='201909091111' and domain='www.csylh.cn';
    
    
    >update traffic_m5 set traffic=9999 where m5='201909091111' and domain='www.csylh.cn';
    问题:ROW模式 , binlog 更新几个字段?
    /var/lib/mysql/master.000001
    /dev/vda1        40G  6.3G   31G  17% 
    
    
    2 指定sink为KAFKA==============
    
    Kafka:
    bin/maxwell 
    --user='maxwell' 
    --password='P@ssw0rd' 
    --host='127.0.0.1' 
    --producer=kafka 
    --kafka.bootstrap.servers=172.16.156.253:9092 
    --kafka_topic=maxwell 
    --kafka_version=1.1.0 
    --filter 'exclude: *.*, include: test.*'
    
    
    
    binlog 位置: /var/lib/mysql/master.000001
    
    3 指定sink为Redis==============
    
    bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' 
        --producer=redis --redis_host=redis.hostname
    
    
    问题1:Kafka 版本问题:
    Using kafka version: 1.0.0
    
    kafka_version  Not available in config.properties.  0.11.0.1 【默认已经是1.0.0版本】
    kafka_version 在配置文件里面配置是无效的,其实是命令行才行
    
    需要将自己生产上 kafka-client-你的版本号.jar 的架包拷贝到这里
    /root/apps/maxwell-1.22.3/lib/kafka-clients
    
    
    
    问题2:filtering 生产上有多个库,每个库有多个表
    同步到大数据平台,并不是需要把所有库,表都同步的
    参考:http://maxwells-daemon.io/config/
    
    解决:Maxwell可以配置为过滤掉特定表的更新。
    这由--filter命令行标志控制
    --filter = 'exclude: foodb.*, include: foodb.tbl, include: foodb./table_d+/'
    
    过滤器选项按指定的顺序进行评估,因此在此示例中,我们将禁止除db1数据库中的更新之外的所有内容。
    
    --filter = 'exclude: *.*, include: db1.*'
    
    问题3: mysql binlog pos 10000, 指定从pos 9999开始
    怎么办?
    
    
    其他问题:
    Maxwell : 
            时区问题 
    		Column 与 Value 不相等  --> 串位
    
  • 相关阅读:
    第三发
    第二发
    第一发
    要看的算法
    haxe坑
    TCP/IP协议三次握手与四次握手流程解析(转)
    Android动态类生成预加载-dexmaker使用
    Java中ArrayList 、LinkList区别
    Java解析YAML和Android解析YAML
    Java sax、dom、pull解析xml
  • 原文地址:https://www.cnblogs.com/liuge36/p/12614723.html
Copyright © 2011-2022 走看看