zoukankan      html  css  js  c++  java
  • golang kafka

    golang kafka – hello world

    https://github.com/Shopify/sarama

    https://shopify.github.io/sarama/

    consumer.go

    package main
     
    import (
        "fmt"
        "github.com/Shopify/sarama"
        "log"
        "os"
        "strings"
        "sync"
    )
     
    var (
        wg     sync.WaitGroup
        logger = log.New(os.Stderr, "[srama]"log.LstdFlags)
    )
     
    func main() {
     
        sarama.Logger = logger
     
        consumer, err := sarama.NewConsumer(strings.Split("localhost:9092"","), nil)
        if err != nil {
            logger.Println("Failed to start consumer: %s", err)
        }
     
        partitionList, err := consumer.Partitions("hello")
        if err != nil {
            logger.Println("Failed to get the list of partitions: ", err)
        }
     
        for partition := range partitionList {
            pc, err := consumer.ConsumePartition("hello", int32(partition), sarama.OffsetNewest)
            if err != nil {
                logger.Printf("Failed to start consumer for partition %d: %s ", partition, err)
            }
            defer pc.AsyncClose()
     
            wg.Add(1)
     
            go func(sarama.PartitionConsumer) {
                defer wg.Done()
                for msg := range pc.Messages() {
                    fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
                    fmt.Println()
                }
            }(pc)
        }
     
        wg.Wait()
     
        logger.Println("Done consuming topic hello")
        consumer.Close()
    }

    producer.go

    package main
     
    import (
        "github.com/Shopify/sarama"
        "log"
        "os"
        "strings"
    )
     
    var (
        logger = log.New(os.Stderr, "[srama]"log.LstdFlags)
    )
     
    func main() {
        sarama.Logger = logger
     
        config := sarama.NewConfig()
        config.Producer.RequiredAcks = sarama.WaitForAll
        config.Producer.Partitioner = sarama.NewRandomPartitioner
     
        msg := &sarama.ProducerMessage{}
        msg.Topic = "hello"
        msg.Partition = int32(-1)
        msg.Key = sarama.StringEncoder("key")
        msg.Value = sarama.ByteEncoder("你好, 世界!")
     
        producer, err := sarama.NewSyncProducer(strings.Split("localhost:9092"","), config)
        if err != nil {
            logger.Println("Failed to produce message: %s", err)
            os.Exit(500)
        }
        defer producer.Close()
     
        partition, offset, err := producer.SendMessage(msg)
        if err != nil {
            logger.Println("Failed to produce message: ", err)
        }
        logger.Printf("partition=%d, offset=%d ", partition, offset)
    }

     

    此条目发表在GolangLinux分类目录。将固定链接加入收藏夹。
  • 相关阅读:
    notepad++一次去掉所有空行,然后加上2个空行
    mysql设置timpstamp的默认值为 '0000-00-00 00:00:00' 时报错
    Xshell 5的快捷键
    word2010文档如何隐藏右侧灰色空白不可编辑区域
    记一次惊险的系统和分区修复
    nohup command 2>&1 & 的含义
    alexkn android第一行代码-7.广播
    android第一行代码-6.自定义控件的实现
    android第一行代码-5.监听器的两种用法和context
    android第一行代码-3.activity之间的调用跟数据传递
  • 原文地址:https://www.cnblogs.com/simbadan/p/5487095.html
Copyright © 2011-2022 走看看