zoukankan      html  css  js  c++  java
  • 【kafka学习笔记】Go接入kafka

    需要借助的库

    github.com/Shopify/sarama // kafka主要的库*
    github.com/bsm/sarama-cluster // kafka消费组
    

    生产者

    package producer
    
    import (
    	"fmt"
    	"github.com/HappyTeemo7569/teemoKit/tlog"
    	"github.com/Shopify/sarama"
    	"kafkaDemo/define"
    )
    
    var (
    	ProducerId = 1
    )
    
    type Producer struct {
    	Producer   sarama.SyncProducer
    	Topic      string //主题
    	ProducerID int    //生产者Id
    	MessageId  int
    }
    
    func (p *Producer) InitProducer() {
    	config := sarama.NewConfig()
    	config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
    	config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
    	config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回
    
    	// 连接kafka
    	client, err := sarama.NewSyncProducer([]string{define.SERVER_LIST}, config)
    	if err != nil {
    		tlog.Error("producer closed, err:", err)
    		return
    	}
    
    	p.Producer = client
    	p.Topic = define.TOPIC
    	p.ProducerID = ProducerId
    	p.MessageId = 1
    
    	ProducerId++
    }
    
    func (p *Producer) SendMessage() {
    	// 构造一个消息
    	msg := &sarama.ProducerMessage{}
    	msg.Topic = p.Topic
    	txt := fmt.Sprintf("ProducerID:%d  this is a test log %d",
    		p.ProducerID, p.MessageId)
    	msg.Value = sarama.StringEncoder(txt)
    
    	// 发送消息
    	pid, offset, err := p.Producer.SendMessage(msg)
    	//_, _, err := client.SendMessage(msg)
    	if err != nil {
    		fmt.Println("send msg failed, err:", err)
    		return
    	}
    	tlog.Info(fmt.Sprintf("ProducerID:%d pid:%v offset:%v msg:%s",
    		p.ProducerID, pid, offset, txt))
    
    	p.MessageId++
    }
    
    func (p *Producer) Close() {
    	p.Producer.Close()
    }
    
    

    消费者

    package consumer
    
    import (
    	"github.com/HappyTeemo7569/teemoKit/tlog"
    	"github.com/Shopify/sarama"
    	"kafkaDemo/define"
    )
    
    type Consumer struct {
    	Consumer   sarama.Consumer
    	Topic      string
    	ConsumerId int //消费者Id
    }
    
    func (c *Consumer) InitConsumer() error {
    	consumer, err := sarama.NewConsumer([]string{define.SERVER_LIST}, nil)
    	if err != nil {
    		return err
    	}
    	c.Consumer = consumer
    	c.Topic = define.TOPIC
    	c.ConsumerId = ConsumerId
    	ConsumerId++
    	return nil
    }
    
    //指定partition
    //offset 可以指定,传-1为获取最新offest
    func (c *Consumer) GetMessage(partitionId int32, offset int64) {
    	if offset == -1 {
    		offset = sarama.OffsetNewest
    	}
    	pc, err := c.Consumer.ConsumePartition(c.Topic, partitionId, offset)
    	if err != nil {
    		tlog.Error("failed to start consumer for partition %d,err:%v", partitionId, err)
    		//That topic/partition is already being consumed
    		return
    	}
    
    	// 异步从每个分区消费信息
    	go func(sarama.PartitionConsumer) {
    		for msg := range pc.Messages() {
    			tlog.Info("ConsumerId:%d Partition:%d Offset:%d Key:%v Value:%v", c.ConsumerId, msg.Partition, msg.Offset, msg.Key, string(msg.Value))
    		}
    	}(pc)
    }
    
    //遍历所有分区
    func (c *Consumer) GetMessageToAll(offset int64) {
    	partitionList, err := c.Consumer.Partitions(c.Topic) // 根据topic取到所有的分区
    	if err != nil {
    		tlog.Error("fail to get list of partition:err%v", err)
    		return
    	}
    	tlog.Info("所有partition:", partitionList)
    
    	for partition := range partitionList { // 遍历所有的分区
    		c.GetMessage(int32(partition), offset)
    	}
    }
    
    
    

    主函数

    func main() {
    	tlog.Info("开始")
    
    	go producer.Put()
    	go consumer.Get()
    
    	for {
    		time.Sleep(time.Hour * 60)
    	}
    }
    
    
    func Put() {
    	producer := new(Producer)
    	producer.InitProducer()
    	go func() {
    		for {
    			producer.SendMessage()
    			time.Sleep(1 * time.Second)
    		}
    	}()
    }
    
    func Get() {
    
    	offest := int64(0)
    
    	consumer := new(Consumer)
    	err := consumer.InitConsumer()
    	if err != nil {
    		tlog.Error("fail to init consumer, err:%v", err)
    		return
    	}
    	consumer.GetMessageToAll(offest)
    }
    
    
    

    具体源码可以查看:
    https://github.com/HappyTeemo7569/kafkaDemo

    当你停下来休息的时候,不要忘记别人还在奔跑!
  • 相关阅读:
    JS判断鼠标从什么方向进入一个容器
    [JS进阶] 编写可维护性代码 (1)
    CSS3 animation小动画
    如何使用js捕获css3动画
    webpack入门(译)
    js拖拽3D立方体旋转
    简单3D翻转
    html 基础
    python 并发编程
    python 网络编程
  • 原文地址:https://www.cnblogs.com/HappyTeemo/p/15686715.html
Copyright © 2011-2022 走看看