前提:开启 zookeeper 、 kafka
生产者代码:
步骤:1. 生成配置文件(生产者基础配置文件、指定生产者回复消息等级 0 1 all、指定生产者消息发送成功或者失败后的返回通道是什么、
指定发送到哪一个分区(本文为 随机分区 正常有三种: 通过partiton、通过key 去 Hash出一个分区、轮询))
2. 构建消息(msg := &sarama.Message{} 这里为指针 1.消息可更改 2. 下面的 发送消息SendMessage() 需要指针类型的参数)
3. 连接kafka
4. 发送消息
package main import ( "fmt" "github.com/Shopify/sarama" "log" ) func main() { // 构建 生产者 // 生成 生产者配置文件 config := sarama.NewConfig() // 设置生产者 消息 回复等级 0 1 all config.Producer.RequiredAcks = sarama.WaitForAll // 设置生产者 成功 发送消息 将在什么 通道返回 config.Producer.Return.Successes = true // 设置生产者 发送的分区 config.Producer.Partitioner = sarama.NewRandomPartitioner // 构建 消息 msg := &sarama.ProducerMessage{} msg.Topic = "aaa" msg.Value = sarama.StringEncoder("123") // 连接 kafka producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config) if err != nil { log.Print(err) return } defer producer.Close() // 发送消息 message, offset, err := producer.SendMessage(msg) if err != nil { log.Println(err) return } fmt.Println(message, " ", offset) }
消费者 代码:
步骤: 1. 生成消费者 对象 连接对应的 地址 config可以为nil
2. 拿到所有对应主题下的所有分区
3. 遍历每一个分区 调用 消费者对象 传入 对应的 主题 哪一个具体的分区 从什么位置开始读取文件 Return:消息对象
4. 通过 消息对象.Message() 可以取到对应的消息
package main import ( "fmt" "github.com/Shopify/sarama" "log" "sync" ) // 消费者练习 func main() { // 生成消费者 实例 consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil) if err != nil { log.Print(err) return } // 拿到 对应主题下所有分区 partitionList, err := consumer.Partitions("aaa") if err != nil { log.Println(err) return } var wg sync.WaitGroup wg.Add(1) // 遍历所有分区 for partition := range partitionList{ //消费者 消费 对应主题的 具体 分区 指定 主题 分区 offset return 对应分区的对象 pc, err := consumer.ConsumePartition("aaa", int32(partition), sarama.OffsetNewest) if err != nil { log.Println(err) return } // 运行完毕记得关闭 defer pc.AsyncClose() // 去出对应的 消息 // 通过异步 拿到 消息 go func(sarama.PartitionConsumer) { defer wg.Done() for msg := range pc.Messages(){ fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value) } }(pc) } wg.Wait() }
参考文档:https://www.liwenzhou.com/posts/Go/go_kafka/