kafka
kafka是什么?
kafka 是一个分布式消息发布订阅系统,可以处理大量数据,常用在线消息消费,数据可以保存到磁盘里,当消费者去取数据中途发生断开,kafka会做断点,连接成功继续读取,减少数据丢失和冗余
kafka架构
Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
Partition
Parition是物理上的概念,每个Topic包含一个或多个Partition.
Producer
负责发布消息到Kafka broker
Consumer
消息消费者,向Kafka broker读取消息的客户端。
Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)
配置文件
Vi server.properies
Port 9091默认端口号
Broker.id =0 # ID自己定义,唯一
#host.name=。。。。 #接收的ip,注释掉接收所有ip
Num.network.threads=3 #kafka 处理网络请求线程数 一般情况下就是cpu的颗粒数
num.io.threads=8 处理io请求线程数
socket.send/receive.buffer.bytes=102400 发送接收缓存的大小
num.partitions=1 默认一个分区 不用改
Num.partitions=1 #分区
Zookeeper.connect=xxxxx:xxx,yyyy:yyy,zzzz:zzz #zookeeper连接ip
zookeeper.connection.timeout.ms=1000000 连接最大超时时间
启动:
kafka自带一个zk,但是我们还有用自己的,不用它的
先起zookeeper(配置zookeeper请见zookeeper的基本配置)
bin/kafka-server-start.sh config/server .properties #不能后台
nohup bin/kafka-server-start.sh config/server.properties &
内存不足可能起不来ps -ef |grep kafka 查看进程 或者看日志
kafka-console-consumer.sh
这个命令只是简单的将消息输出到标准输出中,通常使用该命令来获取kafka的topic中的数据。
使用方法:kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
kafka-console-producer.sh
这个命令可以将文件或标准输入的内容发送到Kafka集群。
使用方法:bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
kafka-topics.sh
描述topic的配置
bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type topics --entity-name test_topic
设置保留时间
如果您需要删除主题中的所有消息,则可以利用保留时间。先将保留时间设置为非常低(1000 ms),等待几秒钟,然后将保留时间恢复为上一个值。默认保留时间为24小时(86400000毫秒)。
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name test_topic --add-config retention.ms=1000
删除topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test_topic
kafka-topics.sh
topic信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_topic
添加topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test_topic
列出topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
连接测试:生产者消费者演示:3个zookeeper分别对应下面端口个位的1,2,3
生产者:
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
消费者:
bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test --from-beginning
总结:作为9092端口代表的leader,可以向3个生产者发送信息,生产者一样可以向leader发送信息,但是不能发送给其他两个follower,当leader宕机或者挂掉,默认有新的leader启动,但是节点没有挂到,而是让其他两个follower的新leader去跑,我kill掉zookeeper2,发现消费者2182报错,其他两个消费者可继续发消息而发送9093成为leader,生产者依然是9092可以使用,但是我后台已经把他kill掉了,这就是zookeeper,此时杀掉,所有服务器个数少于一半zookeeper失效。在其他kafka扮演着消息列的离线收集,多集群,高稳定,低延迟的分布式消息系统
地区性:一般传json格式,生产者发送上海,接收者判断json值是上海还是北京,如果是上海接收,如果是北京不接受
版权声明:本文原创发表于 博客园,作者为 RainBol 本文欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则视为侵权。