zoukankan      html  css  js  c++  java
  • golang kafka

    golang kafka – hello world

    https://github.com/Shopify/sarama

    https://shopify.github.io/sarama/

    consumer.go

    package main
     
    import (
        "fmt"
        "github.com/Shopify/sarama"
        "log"
        "os"
        "strings"
        "sync"
    )
     
    var (
        wg     sync.WaitGroup
        logger = log.New(os.Stderr, "[srama]"log.LstdFlags)
    )
     
    func main() {
     
        sarama.Logger = logger
     
        consumer, err := sarama.NewConsumer(strings.Split("localhost:9092"","), nil)
        if err != nil {
            logger.Println("Failed to start consumer: %s", err)
        }
     
        partitionList, err := consumer.Partitions("hello")
        if err != nil {
            logger.Println("Failed to get the list of partitions: ", err)
        }
     
        for partition := range partitionList {
            pc, err := consumer.ConsumePartition("hello", int32(partition), sarama.OffsetNewest)
            if err != nil {
                logger.Printf("Failed to start consumer for partition %d: %s ", partition, err)
            }
            defer pc.AsyncClose()
     
            wg.Add(1)
     
            go func(sarama.PartitionConsumer) {
                defer wg.Done()
                for msg := range pc.Messages() {
                    fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
                    fmt.Println()
                }
            }(pc)
        }
     
        wg.Wait()
     
        logger.Println("Done consuming topic hello")
        consumer.Close()
    }

    producer.go

    package main
     
    import (
        "github.com/Shopify/sarama"
        "log"
        "os"
        "strings"
    )
     
    var (
        logger = log.New(os.Stderr, "[srama]"log.LstdFlags)
    )
     
    func main() {
        sarama.Logger = logger
     
        config := sarama.NewConfig()
        config.Producer.RequiredAcks = sarama.WaitForAll
        config.Producer.Partitioner = sarama.NewRandomPartitioner
     
        msg := &sarama.ProducerMessage{}
        msg.Topic = "hello"
        msg.Partition = int32(-1)
        msg.Key = sarama.StringEncoder("key")
        msg.Value = sarama.ByteEncoder("你好, 世界!")
     
        producer, err := sarama.NewSyncProducer(strings.Split("localhost:9092"","), config)
        if err != nil {
            logger.Println("Failed to produce message: %s", err)
            os.Exit(500)
        }
        defer producer.Close()
     
        partition, offset, err := producer.SendMessage(msg)
        if err != nil {
            logger.Println("Failed to produce message: ", err)
        }
        logger.Printf("partition=%d, offset=%d ", partition, offset)
    }

     

    此条目发表在GolangLinux分类目录。将固定链接加入收藏夹。
  • 相关阅读:
    ContentResolver.query()—>buildQueryString()
    maven基础依赖外部lib包(依赖钉钉sdk为例)
    在 Windows 中配置Maven
    windows系统下设置mtu值的方法
    dotfuscator 在混淆.Net Framework 4.0以上版本的时候报错的解决方法2
    dotfuscator 在混淆.Net Framework 4.0以上版本的时候报错的解决方法
    C# 反编译防范
    SpringBoot 集成Shiro
    windows系统下同时安装mysql5.5和8.0.11
    Eclipse安装STS(Spring Tool Suite (STS) for Eclipse)插件
  • 原文地址:https://www.cnblogs.com/simbadan/p/5487095.html
Copyright © 2011-2022 走看看