zoukankan      html  css  js  c++  java
  • Go操作kafka


    更新、更全的《Go从入门到放弃》的更新网站,更有python、go、人工智能教学等着你:https://www.cnblogs.com/nickchen121/p/11517502.html

    Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能、持久化、多副本备份、横向扩展等特点。本文介绍了如何使用Go语言发送和接收kafka消息。

    一、sarama

    Go语言中连接kafka使用第三方库:github.com/Shopify/sarama

    1.1 下载及安装

    go get github.com/Shopify/sarama
    

    1.2 注意事项

    sarama v1.20之后的版本加入了zstd压缩算法,需要用到cgo,在Windows平台编译时会提示类似如下错误:

    # github.com/DataDog/zstd
    exec: "gcc":executable file not found in %PATH%
    

    所以在Windows平台请使用v1.19版本的sarama。

    二、连接kafka发送消息

    package main
    
    import (
    	"fmt"
    
    	"github.com/Shopify/sarama"
    )
    
    // 基于sarama第三方库开发的kafka client
    
    func main() {
    	config := sarama.NewConfig()
    	config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
    	config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
    	config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回
    
    	// 构造一个消息
    	msg := &sarama.ProducerMessage{}
    	msg.Topic = "web_log"
    	msg.Value = sarama.StringEncoder("this is a test log")
    	// 连接kafka
    	client, err := sarama.NewSyncProducer([]string{"192.168.1.7:9092"}, config)
    	if err != nil {
    		fmt.Println("producer closed, err:", err)
    		return
    	}
    	defer client.Close()
    	// 发送消息
    	pid, offset, err := client.SendMessage(msg)
    	if err != nil {
    		fmt.Println("send msg failed, err:", err)
    		return
    	}
    	fmt.Printf("pid:%v offset:%v
    ", pid, offset)
    }
    

    三、连接kafka消费消息

    package main
    
    import (
    	"fmt"
    
    	"github.com/Shopify/sarama"
    )
    
    // kafka consumer
    
    func main() {
    	consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
    	if err != nil {
    		fmt.Printf("fail to start consumer, err:%v
    ", err)
    		return
    	}
    	partitionList, err := consumer.Partitions("web_log") // 根据topic取到所有的分区
    	if err != nil {
    		fmt.Printf("fail to get list of partition:err%v
    ", err)
    		return
    	}
    	fmt.Println(partitionList)
    	for partition := range partitionList { // 遍历所有的分区
    		// 针对每个分区创建一个对应的分区消费者
    		pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
    		if err != nil {
    			fmt.Printf("failed to start consumer for partition %d,err:%v
    ", partition, err)
    			return
    		}
    		defer pc.AsyncClose()
    		// 异步从每个分区消费信息
    		go func(sarama.PartitionConsumer) {
    			for msg := range pc.Messages() {
    				fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
    			}
    		}(pc)
    	}
    }
    
  • 相关阅读:
    题解 CF171G 【Mysterious numbers
    题解 P1157 【组合的输出】
    题解 P3955 【图书管理员】
    题解 P2036 【Perket】
    题解 CF837A 【Text Volume】
    题解 CF791A 【Bear and Big Brother】
    题解 CF747A 【Display Size】
    题解 P1332 【血色先锋队】
    题解 P2660 【zzc 种田】
    题解 P4470 【[BJWC2018]售票】
  • 原文地址:https://www.cnblogs.com/nickchen121/p/11517444.html
Copyright © 2011-2022 走看看