kafka
Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一个分布式的,可划分的,冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据。
通过集群来提供实时的消费;
节点信息 kafka1 192.168.1.31 broker.id=1 Kafka2 192.168.1.32 broker.id=2 kafka3 192.168.1.33 broker.id=3 端口 9092
版本 kafka_2.11-0.10.1.0.tgz
下载安装: https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz # mkdir /opt/kafka_2.11-0.10.1.0/logs
配置信息:
修改配置文件示例 # cat config/server.properties broker.id=1 #每个服务的 标识ID (每个服务都是唯一的) listeners=PLAINTEXT://192.168.1.31:9092 #备注:listeners一定要配置成为IP地址;如果配置为localhost或服务器的hostname,在使用java发送数据时就会抛出异 ;也可以设置hostname message.max.bytes=10000000 #服务器可以接收的最大的消息大小 default.replication.factor=1 #默认副本 replica.lag.time.max.ms=10000 #在此窗口时间内没有收到follower的fetch请求,leader会将其从ISR(in-sync replicas)中移除 num.network.threads=3 num.io.threads=8 #服务器执行读写的IO线程数 (可以根据服务器磁盘数量) socket.send.buffer.bytes=102400 #发送字节缓冲最大值 socket.receive.buffer.bytes=102400 #接收缓冲区最大字节 socket.request.max.bytes=104857600 #服务器允许请求的最大值,防止内存溢出,小于内存实际大小; num.partitions=1 #默认partition数量,如果topic在创建时没有指定partition数量,默认使用此值,建议改为5 num.recovery.threads.per.data.dir=1 #在启动时用于日志恢复和关闭时刷新的每个数据目录的线程数。 log.retention.hours=168 #日志保留时长 log.segment.bytes=1073741824 #日志字段大小; log.retention.check.interval.ms=300000 #日志保留间隔时间; log.dirs= /opt/kafka_2.11-0.10.1.0/logs #zk 配置 zookeeper.connect=192.168.1.31:2181,192.168.1.32:2181,192.168.1.33:2181 zookeeper.connection.timeout.ms=6000
新增Kafka环境变量:(配置kafka内存)
添加环境变量 cat /etc/profile export KAFKA_HEAP_OPTS="-Xms30g -Xmx30g -XX:+UseG1GC -XX:MaxGCPauseMillis=50 -XX:InitiatingHeapOccupancyPercent=45" source /etc/profile
启动:
nohup 后台运行 nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &