zoukankan      html  css  js  c++  java
  • mysql+canal+kafka+elasticsearch构建数据查询平台

    1. 实验环境

    CPU:4
    内存:8G
    ip:192.168.0.187

    开启iptables防火墙
    关闭selinux
    java >=1.5
    使用yum方式安装的java,提前配置好JAVA_HOME环境变量

    vim /etc/profile.d/java.sh
    	#!/bin/bash
    
    	export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk # 路径根据实际情况而定
    	export PATH=$PATH:$JAVA_HOME/bin
    source /etc/profile.d/java.sh
    

    2. MySQL信息

    mysql账号
    root
    MySQL密码
    liykpntuu9?C

    操作

    vim /etc/my.cnf
    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    
    service mysqld restart
    

    登陆数据库后操作

    CREATE USER canal IDENTIFIED BY 'canal!%123AD';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;
    

    3. canal操作

    # 下载
    wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
    mkdir -p /usr/local/canal
    tar -zxv -f canal.deployer-1.1.4.tar.gz -C /usr/local/canal
    
    # 修改连接数据库的配置文件
    cd /usr/local/canal
    vim conf/example/instance.properties
    	## mysql serverId
    	canal.instance.mysql.slaveId = 123
    	#position info,需要改成自己的数据库信息
    	canal.instance.master.address = 127.0.0.1:3306 
    	canal.instance.master.journal.name = 
    	canal.instance.master.position = 
    	canal.instance.master.timestamp = 
    	#canal.instance.standby.address = 
    	#canal.instance.standby.journal.name =
    	#canal.instance.standby.position = 
    	#canal.instance.standby.timestamp = 
    	#username/password,需要改成自己的数据库信息
    	canal.instance.dbUsername = canal  
    	canal.instance.dbPassword = canal!%123AD
    	canal.instance.defaultDatabaseName =
    	canal.instance.connectionCharset = UTF-8
    	#table regex
    	canal.instance.filter.regex = .*\\..*
    
    # 启动
    bash bin/startup.sh
    
    # 查看 server 日志
    tail -n 30 logs/canal/canal.log
    	2019-09-20 09:48:46.987 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
    	2019-09-20 09:48:47.019 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
    	2019-09-20 09:48:47.028 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
    	2019-09-20 09:48:47.059 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.0.187(192.168.0.187):11111]
    	2019-09-20 09:48:48.228 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
    
    # 查看 instance 的日志
    	2019-09-20 09:48:47.395 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
    	2019-09-20 09:48:47.399 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
    	2019-09-20 09:48:47.580 [main] WARN  o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]
    	2019-09-20 09:48:47.626 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
    	2019-09-20 09:48:47.626 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
    	2019-09-20 09:48:48.140 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
    	2019-09-20 09:48:48.147 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*..*$
    	2019-09-20 09:48:48.147 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : 
    	2019-09-20 09:48:48.165 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
    	2019-09-20 09:48:48.288 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
    	2019-09-20 09:48:48.288 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
    	2019-09-20 09:48:49.288 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000004,position=4,serverId=1,gtid=<null>,timestamp=1568943354000] cost : 989ms , the next step is binlog dump
    
    # 关闭
    bash bin/stop.sh
    
    # 端口使用情况
    ss -tlnp
    State       Recv-Q Send-Q            Local Address:Port      Peer Address:Port              
    LISTEN      0      50                   *:11110                  *:*                   users:(("java",pid=2078,fd=109))
    LISTEN      0      50                   *:11111                  *:*                   users:(("java",pid=2078,fd=105))
    LISTEN      0      3                    *:11112                  *:*                   users:(("java",pid=2078,fd=87))
    
    # 端口号说明
    # admin端口:11110
    # tcp端口:11111
    # metric端口:11112
    
    
    
    # canal-admin 使用WEB UI界面查看管理canal
    
    # canal-admin的限定依赖:
    #    MySQL,用于存储配置和节点等相关数据
    #    canal版本,要求>=1.1.4 (需要依赖canal-server提供面向admin的动态运维管理接口)
    wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
    tar -zxv -f canal-1.1.4/canal.admin-1.1.4.tar.gz -C /usr/local/src/canal_admin
    vim conf/application.yml
    	server:
    	  port: 8089 # 端口号,防火墙放行该端口号
    	spring:
    	  jackson:
    	    date-format: yyyy-MM-dd HH:mm:ss
    	    time-zone: GMT+8
    
    	spring.datasource:
    	  address: 127.0.0.1:3306 # 数据库地址和端口
    	  database: canal_manager # 数据库名
    	  username: canal_admin   # 数据库账号 ,注意跟一开始创建的canal账号区分开,需要修改一下
    	  password: ABC123,.abc@#11  # 数据库密码
    	  driver-class-name: com.mysql.jdbc.Driver
    	  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
    	  hikari:
    	    maximum-pool-size: 30
    	    minimum-idle: 1
    
    	canal:
    	  adminUser: admin   # 平台账号
    	  adminPasswd: admin # 平台密码
    
    # 注意,数据库名,账号和密码需要提前创建好
    # 若修改默认的数据库名,则示例sql文件中也需要修改
    # 这里只修改默认的数据库账号和密码,其余保持默认
    
    # 初始化元数据库
    # 初始化SQL脚本里会默认创建canal_manager的数据库,建议使用root等有超级权限的账号进行初始化 b. canal_manager.sql默认会在conf目录下
    mysql -hlocalhost -uroot -p
    mysql> source /usr/local/canal_admin/conf/canal_manager.sql;
    
    # 启动
    bash bin/startup.sh
    
    # 查看 admin 日志
    tail -n 30 logs/admin.log
    	2019-09-20 14:50:54.595 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8089"]
    	2019-09-20 14:50:54.624 [main] INFO  org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
    	2019-09-20 14:50:54.812 [main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8089 (http) with context path ''
    	2019-09-20 14:50:54.818 [main] INFO  com.alibaba.otter.canal.admin.CanalAdminApplication - Started CanalAdminApplication in 11.057 seconds (JVM running for 12.731)
    
    # 浏览器访问,防火墙放行8089端口号
    # 地址:http://192.168.0.187:8089/ 访问,默认密码:admin/123456 
    
    # 使用
    # 创建一个集群,添加已有的canal
    # 因为端口的问题,暂时只能添加一个
    # 另外canal是否可以组件集群,还有待研究
    
    # 停止
    bash bin/stop.sh
    

    4. zookeeper

    # 设置zookeeper集群
    cd /usr/local/src
    wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.5/apache-zookeeper-3.5.5-bin.tar.gz
    tar -zxvf apache-zookeeper-3.5.5-bin.tar.gz -C /usr/local
    cd /usr/local/apache-zookeeper-3.5.5-bin
    
    mkdir -p /zkdata/{zookeeper-1,zookeeper-2,zookeeper-3}
    
    cp conf/zoo_sample.cfg conf/zoo-1.cfg
    # vim conf/zoo-1.cfg
    	dataDir=/zkdata/zookeeper-1
    	clientPort=2181
    
    	server.1=127.0.0.1:2888:3888
    	server.2=127.0.0.1:2889:3889
    	server.3=127.0.0.1:2890:3890
    
    cp conf/zoo-1.cfg conf/zoo-2.cfg
    cp conf/zoo-1.cfg conf/zoo-3.cfg
    
    vim conf/zoo-2.cfg
    	dataDir=/zkdata/zookeeper-2
    	clientPort=2182
    
    	server.1=127.0.0.1:2888:3888
    	server.2=127.0.0.1:2889:3889
    	server.3=127.0.0.1:2890:3890
    
    vim conf/zoo-3.cfg
    	dataDir=/zkdata/zookeeper-3
    	clientPort=2183
    
    	server.1=127.0.0.1:2888:3888
    	server.2=127.0.0.1:2889:3889
    	server.3=127.0.0.1:2890:3890
    
    echo '1' > /zkdata/zookeeper-1/myid
    echo '2' > /zkdata/zookeeper-2/myid 
    echo '3' > /zkdata/zookeeper-3/myid 
    
    # 修改启动文件,避免后续出现如下错误
    # stat is not executed because it is not in the whitelist.
    # envi is not executed because it is not in the whitelist.
    
    # nc命令需要安装其他软件
    yum install nmap-ncat
    
    # envi命令执行报错提示:envi is not executed because it is not in the whitelist.
    # 解决办法 修改启动指令 zkServer.sh ,往里面添加 :ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"
    
    	else
    	    echo "JMX disabled by user request" >&2
    	    ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain" # 注意找到这个信息
    	fi
    
    # 如果不想添加在这里,注意位置和赋值的顺序
    ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"
    
    # 然后重启zookeeper
    
    # 集群启动脚本
    vim start.sh
    	bash bin/zkServer.sh start conf/zoo-1.cfg
    	bash bin/zkServer.sh start conf/zoo-2.cfg
    	bash bin/zkServer.sh start conf/zoo-3.cfg
    
    # 集群关闭脚本
    vim start.sh
    	bash bin/zkServer.sh stop conf/zoo-1.cfg
    	bash bin/zkServer.sh stop conf/zoo-2.cfg
    	bash bin/zkServer.sh stop conf/zoo-3.cfg
    
    # 检测集群状态
    [root@bogon apache-zookeeper-3.5.5-bin]# bash bin/zkServer.sh status conf/zoo-1.cfg
    	/usr/bin/java
    	ZooKeeper JMX enabled by default
    	Using config: conf/zoo-1.cfg
    	Client port found: 2181. Client address: localhost.
    	Mode: follower
    
    [root@bogon apache-zookeeper-3.5.5-bin]# bash bin/zkServer.sh status conf/zoo-2.cfg
    	/usr/bin/java
    	ZooKeeper JMX enabled by default
    	Using config: conf/zoo-2.cfg
    	Client port found: 2182. Client address: localhost.
    	Mode: leader
    
    [root@bogon apache-zookeeper-3.5.5-bin]# bash bin/zkServer.sh status conf/zoo-3.cfg
    	/usr/bin/java
    	ZooKeeper JMX enabled by default
    	Using config: conf/zoo-3.cfg
    	Client port found: 2183. Client address: localhost.
    	Mode: follower
    
    # 使用WEB UI查看监控集群-zk ui安装
    cd /usr/local
    
    git clone https://github.com/DeemOpen/zkui.git
    
    yum install -y maven
    
    # 更换使用阿里云maven源
    vim /etc/maven/settings.xml 
    	<mirrors>  
    
    	    <mirror>
    	        <id>nexus-aliyun</id>
    	        <mirrorOf>central</mirrorOf>
    	        <name>Nexus aliyun</name>
    	        <url>http://maven.aliyun.com/nexus/content/groups/public</url>
    	    </mirror>
    
    	</mirrors>
    
    cd zkui/
    
    mvn clean install
    
    # 修改配置文件默认值
    vim config.cfg
        serverPort=9090     #指定端口
        zkServer=localhost:2181,localhost:2182,localhost:2183 # 不使用127.0.0.1
        sessionTimeout=300
    
        # userSet中是登陆web界面的用户名和密码
    	#管理员
    	#admin:manager
    	#用户
    	#appconfig:appconfig
    
    # 启动程序至后台
    vim start.sh
    	#!/bin/bash
    
    	nohup java -jar target/zkui-2.0-SNAPSHOT-jar-with-dependencies.jar &
    
    # 浏览器访问
    # 防火墙放行9090端口,后期改用nginx代理
    http://192.168.0.187:9090/
    

    5. Kafka

    # kafka集群,伪集群
    cd /usr/local/src
    wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
    tar -zxv -f kafka_2.12-2.3.0.tgz -C /usr/local/
    cd /usr/local/kafka_2.12-2.3.0/config
    mkdir -p /kafkadata/{kafka-1,kafka-2,kafka-3}
    cp server.properties server-1.properties
    vim server-1.properties
    	broker.id=1
    	delete.topic.enable=true
    	listeners=PLAINTEXT://:9092
    	advertised.listeners=PLAINTEXT://localhost:9092
    	log.dirs=/kafkadata/kafka-1
    	zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
    
    cp server-1.properties server-2.properties
    vim server-2.properties
    	broker.id=2
    	delete.topic.enable=true
    	listeners=PLAINTEXT://:9093
    	log.dirs=/kafkadata/kafka-2
    	zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
    
    cp server-1.properties server-3.properties
    vim server-3.properties
    	broker.id=3
    	delete.topic.enable=true
    	listeners=PLAINTEXT://:9094
    	log.dirs=/kafkadata/kafka-3
    	zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
    
    # 启动集群
    vim start.sh
    	#!/bin/bash
    
    	bash bin/kafka-server-start.sh -daemon config/server-1.properties
    	bash bin/kafka-server-start.sh -daemon config/server-2.properties
    	bash bin/kafka-server-start.sh -daemon config/server-3.properties
    
    # 停止集群
    vim stop.sh 
    	#!/bin/bash
    
    	bash bin/kafka-server-stop.sh -daemon config/server-1.properties
    	bash bin/kafka-server-stop.sh -daemon config/server-2.properties
    	bash bin/kafka-server-stop.sh -daemon config/server-3.properties
    
    # 监控kafka集群
    # 有一个问题,需要在kafka-server-start.sh文件中配置端口,有如下三种办法
    # 第一种:复制并修改kafka目录,比如kafka-1,kafka-2,kafka-3,然后再每个目录下修改kafka-server-start.sh文件
    # 第二种:在启动脚本start.sh中添加指定端口
    # 第三种:多复制几个kafka-server-start.sh文件,然后进行修改,最后在start.sh中修改一下
    
    # 以下三种方法任选其一即可
    
    # 第一种方式办法,相应行修改成如下形式,注意端口号不同
    # 使用的是不同目录下的不同kafka-server-start.sh文件
    # start.sh文件也需要做相应的修改
    # kafka-1/bin/kafka-server-start.sh
    if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
       # export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
        export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
        export JMX_PORT="9997"
    fi
    # kafka-2/bin/kafka-server-start.sh
    if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
       # export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
        export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
        export JMX_PORT="9998"
    fi
    # kafka-3/bin/kafka-server-start.sh
    if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
       # export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
        export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
        export JMX_PORT="9999"
    fi
    
    # start.sh
    	#!/bin/bash
    	bash kafka-1/bin/kafka-server-start.sh -daemon config/server-1.properties
    	bash kafka-2/bin/kafka-server-start.sh -daemon config/server-2.properties
    	bash kafka-3/bin/kafka-server-start.sh -daemon config/server-3.properties
    
    # 第二种方法
    # 使用的用一个目录下的同一个文件,只是在每个命令前指定端口号
    vim start.sh
    	#!/bin/bash
    
    	JMX_PORT=9997 bash bin/kafka-server-start.sh -daemon config/server-1.properties
    	JMX_PORT=9998 bash bin/kafka-server-start.sh -daemon config/server-2.properties
    	JMX_PORT=9999 bash bin/kafka-server-start.sh -daemon config/server-3.properties
    
    # 第三种方法
    # 使用的是同一个目录下的不同kafka-server-start文件
    # start.sh文件也需要做相应的修改
    cp kafka-server-start.sh kafka-server-start-1.sh
    cp kafka-server-start.sh kafka-server-start-2.sh
    cp kafka-server-start.sh kafka-server-start-3.sh
    
    vim kafka-server-start-1.sh
    	if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    	   # export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    	    export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
    	    export JMX_PORT="9997"
    	fi
    vim kafka-server-start-2.sh
    	if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    	   # export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    	    export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
    	    export JMX_PORT="9998"
    	fi
    vim kafka-server-start-3.sh
    	if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    	   # export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    	    export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
    	    export JMX_PORT="9999"
    	fi
    
    vim start.sh 
    	#!/bin/bash
    
    	bash bin/kafka-server-start-1.sh -daemon config/server-1.properties
    	bash bin/kafka-server-start-2.sh -daemon config/server-2.properties
    	bash bin/kafka-server-start-3.sh -daemon config/server-3.properties
    
    cd /usr/local/src
    wget https://github.com/smartloli/kafka-eagle-bin/archive/v1.3.9.tar.gz
    
    # 多次解压缩后得到kafka-eagle-web-1.3.9目录,然后把该目录复制到/usr/local目录下
    
    cd /usr/local/kafka-eagle-web-1.3.9/conf
    vim system-config.properties
    	kafka.eagle.zk.cluster.alias=cluster1
    	cluster1.zk.list=localhost:2181,localhost:2182,localhost:2183
    	kafka.eagle.metrics.charts=true
    	# 其余保持默认,数据库使用sqlite,注意路径需要事先创建好或修改成当前目录
    	# 数据库也可以更换成MySQL
    	kafka.eagle.url=jdbc:sqlite:/usr/local/kafka-eagle-web-1.3.9/db/ke.db
    
    # 注意
    # kafka.eagle.zk.cluster.alias的值需要跟下面的这个cluster1.zk.list小数点第一个保持一致,比如都是cluster1,否则获取不到数据
    
    # 添加环境变量
    vim /etc/profile.d/kafka_eagle.sh
    	#!/bin/bash
    
    	export KE_HOME=/usr/local/kafka-eagle-web-1.3.9
    	export PATH=$PATH:$KE_HOME/bin
    source /etc/profile.d/kafka_eagle.sh
    
    
    # 命令相关
    bash bin/ke.sh start|stop|status|stats|restart
    
    # 启动
    bash bin/ke.sh start
    	*******************************************************************
    	* Kafka Eagle system monitor port successful... 
    	*******************************************************************
    	[2019-09-20 12:10:32] INFO: Status Code[0]
    	[2019-09-20 12:10:32] INFO: [Job done!]
    	Welcome to
    	    __ __    ___     ____    __ __    ___            ______    ___    ______    __     ______
    	   / //_/   /   |   / __/   / //_/   /   |          / ____/   /   |  / ____/   / /    / ____/
    	  / ,<     / /| |  / /_    / ,<     / /| |         / __/     / /| | / / __    / /    / __/   
    	 / /| |   / ___ | / __/   / /| |   / ___ |        / /___    / ___ |/ /_/ /   / /___ / /___   
    	/_/ |_|  /_/  |_|/_/     /_/ |_|  /_/  |_|       /_____/   /_/  |_|\____/   /_____//_____/   
    	                                                                                             
    
    	Version 1.3.9
    	*******************************************************************
    	* Kafka Eagle Service has started success.
    	* Welcome, Now you can visit 'http://127.0.0.1:8048/ke'
    	* Account:admin ,Password:123456
    	*******************************************************************
    	* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
    	* <Usage> https://www.kafka-eagle.org/ </Usage>
    	*******************************************************************
    
    # 浏览器访问,防火墙放行该端口,后期改用Nginx代理
    地址:192.168.0.187:8048/ke
    账号:admin,密码:123456
    

    6. 投递数据到Kafka

    # 先进行canal配置,改动配置文件canal.properties
    # serverMode改为kafka
    vim conf/canal.properties
    	canal.serverMode = kafka
    	canal.mq.servers = localhost:9092,localhost:9093,localhost:9094
    
    vim conf/example/instance.propties
    	# mq config
    	canal.mq.topic=canal_manager # 填写数据库库名,表示这个数据库的所有表的操作都在这个topic下
    	# dynamic topic route by schema or table regex
    	# canal.mq.dynamicTopic=.*\..*
    	canal.mq.partition=0
    	# hash partition config
    	# canal.mq.partitionsNum=10
    	# canal.mq.partitionHash=.*\..*
    
    # 以上具体规则详看官方文档
    
    # kafka开启消息队列的自动创建topic模式,相关配置在kafka的server.properties
    echo 'auto.create.topics.enable=true' >> config/server-1.properties 
    echo 'auto.create.topics.enable=true' >> config/server-2.properties 
    echo 'auto.create.topics.enable=true' >> config/server-3.properties 
    
    # 相关改动完成后重启canal和kafka
    
    # 使用canal_admin平台查看canal的状态
    # Server管理,操作,日志
    
    # 使用zu ui平台查看kafka的topic情况
    # 左侧导航Topic-List查看生成的topic,这里显示的是canal_manager,上面设置的那个数据库库名
    # 点开Topic Name可以查看具体的数据个数
    
    # 使用命令行kafka-console-consumer.sh --topic canal_manager --bootstrap-server localhost:9092 --from-beginning查看canal传递给kafka的数据
    # 插入一条数据
    	{"data":[{"id":"13","username":"13","password":"6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9","name":"Canal Manager","roles":"admin","introduction":null,"avatar":null,"creation_date":"2019-07-14 00:05:28"}],"database":"canal_manager","es":1568972329000,"id":10,"isDdl":false,"mysqlType":{"id":"bigint(20)","username":"varchar(31)","password":"varchar(128)","name":"varchar(31)","roles":"varchar(31)","introduction":"varchar(255)","avatar":"varchar(255)","creation_date":"timestamp"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"username":12,"password":12,"name":12,"roles":12,"introduction":12,"avatar":12,"creation_date":93},"table":"canal_user","ts":1568972329456,"type":"INSERT"}
    # 删除一条数据
    	{"data":[{"id":"13","username":"13","password":"6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9","name":"Canal Manager","roles":"admin","introduction":null,"avatar":null,"creation_date":"2019-07-14 00:05:28"}],"database":"canal_manager","es":1568972368000,"id":11,"isDdl":false,"mysqlType":{"id":"bigint(20)","username":"varchar(31)","password":"varchar(128)","name":"varchar(31)","roles":"varchar(31)","introduction":"varchar(255)","avatar":"varchar(255)","creation_date":"timestamp"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"username":12,"password":12,"name":12,"roles":12,"introduction":12,"avatar":12,"creation_date":93},"table":"canal_user","ts":1568972369005,"type":"DELETE"}
    

    后续增加使用logstash从Kafka中拉取数据传输到elastic中且指定索引

  • 相关阅读:
    团队第二阶段冲刺——第三天
    团队第二阶段冲刺——第二天
    团队第二阶段冲刺——第一天
    第一阶段意见汇总
    团队第一次绩效考核
    我们与同类产品的差距
    团队项目第一阶段成果评价
    第一阶段验收成果总结
    团队冲刺第十天
    团队冲刺第九天
  • 原文地址:https://www.cnblogs.com/sanduzxcvbnm/p/11558858.html
Copyright © 2011-2022 走看看