前言
消息队列的主要有3大作用
进程通信(IPC):Interprocess Communication
程序解耦:程序由异步变为了异步,提升程序并发(规避IO等待时间)能力。
数据流量削峰:把消息暂时缓冲在消息队列里面。
NSQ传递的消息通常是无序的,当然你也可以保留下信息去check时间戳,因此NSQ更适合处理数据量大但是彼此间没有顺序关系的消息。
消息队列的2种消息传递的模式:
1.点对点模式(queue)
消息生产者生成消息发送到queue中,然后消费者从queue中主动获取数据进行消费。
当1条消息被消费之后,这条消息就会从queue中消失,不存在重复消费。
golang中的channel就是这种模型。
2.发布和订阅者(topic)
生产者生成消息时把消息分类成不同Topic,消费者通过订阅(subcrible)这些Topic,把不同的消息获出来消费。
发布和订阅模式和点对点模式最大的区别:
引入 Publish---> Topic<--->Subcrible概念,Topic把消息分类了意味着生产端可以有多个不同的生成者生成不同的消息,消费端也可有多个不同的消费者订阅不同的Topic。
这种设计模式为生产和消费2端都提供了扩展/收缩的弹性空间 。
在发布和订阅这模式中,消费者端从消息队列获取数据的方式也有2种:
2.1.队列主动推送消息到消费端(供大于求)
2.2.消费者主动去队列拉取消息 (求大于供)
kafka概念
kafka是由Linkedln公司开发的用于处理公司内部海量日志传输问题的,由Scala和Java编写。2011年该公司把它贡献给Apche软件基金会,现已发展成Apche中的顶级开源项目,kafaka是1个分布式(一提到分布式就会想到集群)的基于发布/订阅模式的消息队列(message queue),主要应用于大数据实时处理场景。
kafka具有高吞吐、低延迟、容错率高的特点。
kafka架构
从宏观角度来看kafka就是1个非常粗大管子。在这个大管子的两端有2中角色生产者(producer)和消费者(consumer)。
broker:kafka的分布式就体现在kafka集群中可以灵活扩展broker(kafka集群中的1个节点,服务器)方便我们对kafka集群进行弹性扩展。broker-id不在集群里不能重复。
topic:对消息的分类。
partion:每个Topic可以有N个partion,同1个Topic的数据分布在不同Partion且数据是不重复的,所以partion实现了topic数据的负载均衡,partion的表现形式就是1个个的文件夹。
每个partion类似于Python里面的list,来1条消息apend进去,保证了消息的顺序。
这个list中的每条消息都会分配1个的index(offset偏移量),offset保证消息快速读取,而不是遍历随机读,这也是kafka读取消息快的原因。
follower:每个leader partion(主分区)都有多个 follower pation(副本分区)也就是是备胎(follower),follower实现partion的备份。
当leader partion故障时kafak会选择1个follower partion成为leader partion,kafaka中主分区可以设置的最大的副本数量为10,leader和folloer partion不能在同一个服务器上,followers的数量也不能大于brokers(1个kafka集群中服务器)的数量。
consumer group: 多个消费者组成1个组对同1个topc的消息进行消费(增加了kafka的消费能力)
在同1个consumer group中1个consumer可消费多个分区,但是1个分区只能被consumer group里其中1个consumer消费。
也就是说只要N个consumer在同1 consumer group中,它们消费的数据永远是不一致的。这也是kafka的partion和NSQ的channel之间的本质区别!
producer写数据到kafka的工作流程
1.producer先从kafka集群中获取分区的leader信息
2.producer将消息发送给leader
3.leader将消息写入本地磁盘
4.follower从leader分区拉取消息数据
5.follower将消息写入本地磁盘,向leader partion发送ACK确定.
6.leader收到所有follower的ACK之后,向Producer发生ACK确定。
ps:ack应答机制保证了Producer数据写入的可靠性。
关于以上步骤细节:
producer选择Leader partion的原则?
在kafka中1个topic对应多个leader分区,producer在获取leader信息的时候是如何选择其中1个leader的呢?
1.根据producer指定leader分区来帮它存储数据。(Producer指定了Partion)
2.如果producer没有指定特定的partion那么会根据producer设置的key,hash出1个值自动判断选择哪个leader partion.(Producer指定了Key)
3.如果既没有指定partion也没有指定key就采取轮询的方式写入到不同的partion(轮着来雨露均沾)
producer往kafka发送数据成功之后,ACK应答机制都也哪些?
producer在向kafka发布消息时,可以设置消息数据写入kafka成功之后是否需要ACK应答机制?有以下3个参数!
0:producer往集群中发布数据不需要等待kafka的ACK。(可靠性低,效率高!)
1:代表producer要求只需要leader进行ACK,follower从leader拉取数据的时候就别ACK了。(折中方案)
all:producer--->leader----->leader下面所有的follower数据接收成功后都需要 ACK应答。(可靠性高,效率低)
ps:如果producer往不存在的Topic中发送数据时,kafka会自动创建该Topic,partion和replication的数量默认配置都为1.
安装kafka
1.下载二进制包
二进制包下载之后,根据操作系统 执行kafka和zookeeper的启动脚本即可
[root@zhanggen bin]# ls connect-distributed.sh kafka-producer-perf-test.sh connect-mirror-maker.sh kafka-reassign-partitions.sh connect-standalone.sh kafka-replica-verification.sh kafka-acls.sh kafka-run-class.sh kafka-broker-api-versions.sh kafka-server-start.sh kafka-configs.sh kafka-server-stop.sh kafka-console-consumer.sh kafka-streams-application-reset.sh kafka-console-producer.sh kafka-topics.sh kafka-consumer-groups.sh kafka-verifiable-consumer.sh kafka-consumer-perf-test.sh kafka-verifiable-producer.sh kafka-delegation-tokens.sh trogdor.sh kafka-delete-records.sh windows kafka-dump-log.sh zookeeper-security-migration.sh kafka-leader-election.sh zookeeper-server-start.sh kafka-log-dirs.sh zookeeper-server-stop.sh kafka-mirror-maker.sh zookeeper-shell.sh kafka-preferred-replica-election.sh [root@zhanggen bin]#
2.启动zookeeper
zookeeprt是kafka集群中注册、自动发现服务,类似于NSQ中的Lookupd。kafka的二进制包包含了zookeeper无需单独下载
zookeeper配置kafka集群信息
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
tickTime=2000 dataDir=/home/myname/zookeeper clientPort=2181 initLimit=5 syncLimit=2 server.1=192.168.229.160:2888:3888 server.2=192.168.229.161:2888:3888 server.3=192.168.229.162:2888:3888
启动zookeeper
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
3.启动1个kafka broker
broker配置文件
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
listeners=PLAINTEXT://192.168.11.103:9092
启动
./bin/kafka-server-start.sh ./config/server.properties
4.生产topic
package main import ( "fmt" "github.com/Shopify/sarama" ) func main() { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll //赋值为-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。 config.Producer.Partitioner = sarama.NewRandomPartitioner //写到随机分区中,默认设置8个分区 config.Producer.Return.Successes = true msg := &sarama.ProducerMessage{} msg.Topic = `CMDB_log` msg.Value = sarama.StringEncoder("this is a good test") client, err := sarama.NewSyncProducer([]string{"192.168.56.133:9092"}, config) if err != nil { fmt.Println("producer close err, ", err) return } fmt.Println("Kafka连接成功!") defer client.Close() pid, offset, err := client.SendMessage(msg) if err != nil { fmt.Println("send message failed, ", err) return } fmt.Printf("分区ID:%v, offset:%v ", pid, offset) }
5.验证topic对应partion
生产者生产的每1个Topic都会有对应的partion进行数据存储。
[root@zhanggen kafka-logs]# pwd
/tmp/kafka-logs
[root@zhanggen kafka-logs]# ls
cleaner-offset-checkpoint nginx_log-0 zhanggen_log-0
log-start-offset-checkpoint recovery-point-offset-checkpoint
meta.properties replication-offset-checkpoint
[root@zhanggen kafka-logs]# cd zhanggen_log-0/
[root@zhanggen zhanggen_log-0]# ls
00000000000000000000.index 00000000000000000000.timeindex
00000000000000000000.log leader-epoch-checkpoint
[root@zhanggen zhanggen_log-0]#
6.消费数据
[root@zhanggen kafka_2.12-2.5.0]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.133:9092 --topic=zhanggen_log --from-beginning this is a good test this is a good test this is a good test How's it going?
7.使用go消费数据
根据topic查询到所有分区,然后使用gorutine对每个分区进行消费。
package main import ( "fmt" "github.com/Shopify/sarama" ) func main() { config := sarama.NewConfig() consumer,err:=sarama.NewConsumer([]string{"192.168.56.133:9092"},config) if err!=nil{ fmt.Println("开启consumer 失败") return } //获取某个topic对应的存储分区列表 partionList,err:=consumer.Partitions("ngix-log") if err!=nil{ fmt.Println("获取topic:tailconfig下对应分区失败!") return } fmt.Println(partionList) for _,partion := range partionList{ //从分区列表中获取每个分区,针对每个分区开启单独的go程进行消费, pc,err:=consumer.ConsumePartition("ngix-log",int32(partion),sarama.OffsetNewest) if err!=nil{ fmt.Println("从分区%s获取数据失败") return } defer pc.AsyncClose() //开启go程:异步从每个分区获取数据 go func(partitionConsumer sarama.PartitionConsumer){ for msg := range pc.Messages(){ fmt.Printf("分区:%d offset:%d key:%v value:%v ",msg.Partition,msg.Offset,msg.Key,string(msg.Value)) } }(pc) } select { } }
验证kafka是实时的数据流处理平台,这个很关键。
我的日志内容增加了kafaka的topic需要处理,如果减少了也需要处理。所以kafka很适合传输日志内容。
D:goprojectsrcgo相关模块kafkaconsumer>go run main.go [0] 分区:0 offset:93 key:[] value:nice to meet you Tom.
------------------------------------------------ 分区:0 offset:94 key:[] value:nice to meet you Jack 分区:0 offset:95 key:[] value:how are you? 分区:0 offset:96 key:[] value:it's ok,I lost my girl firend. 分区:0 offset:97 key:[] value:.............
-----------------------------------在文件里删除以上内容之后 分区:0 offset:98 key:[] value: nice to meet you Marry.
参考