golang kafka – hello world
https://github.com/Shopify/sarama
https://shopify.github.io/sarama/
consumer.go
"github.com/Shopify/sarama" |
logger = log.New(os.Stderr, "[srama]", log.LstdFlags) |
consumer, err := sarama.NewConsumer(strings.Split("localhost:9092", ","), nil) |
logger.Println("Failed to start consumer: %s", err) |
partitionList, err := consumer.Partitions("hello") |
logger.Println("Failed to get the list of partitions: ", err) |
for partition := range partitionList { |
pc, err := consumer.ConsumePartition("hello", int32(partition), sarama.OffsetNewest) |
logger.Printf("Failed to start consumer for partition %d: %s
", partition, err) |
go func(sarama.PartitionConsumer) { |
for msg := range pc.Messages() { |
fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) |
logger.Println("Done consuming topic hello") |
producer.go
"github.com/Shopify/sarama" |
logger = log.New(os.Stderr, "[srama]", log.LstdFlags) |
config := sarama.NewConfig() |
config.Producer.RequiredAcks = sarama.WaitForAll |
config.Producer.Partitioner = sarama.NewRandomPartitioner |
msg := &sarama.ProducerMessage{} |
msg.Partition = int32(-1) |
msg.Key = sarama.StringEncoder("key") |
msg.Value = sarama.ByteEncoder("你好, 世界!") |
producer, err := sarama.NewSyncProducer(strings.Split("localhost:9092", ","), config) |
logger.Println("Failed to produce message: %s", err) |
partition, offset, err := producer.SendMessage(msg) |
logger.Println("Failed to produce message: ", err) |
logger.Printf("partition=%d, offset=%d
", partition, offset) |
此条目发表在Golang, Linux分类目录。将固定链接加入收藏夹。