zoukankan      html  css  js  c++  java
  • Golang 连接Kafka

    Kafka介绍

    Kafka是Apache软件基金会开发的一个开源流处理平台,由Java和Scala编写;Kafka是一种高吞吐、分布式、基于订阅发布的消息系统。

    Kafka名称解释

    • Producer:生产者
    • Consumer:消费者
    • Topic:消息主题,每一类的消息称之为一个主题
    • Broker:Kafka以集群的方式运行,可以由一个或多个服务器组成,每个服务器叫做一个broker
    • Partition:物理概念上的分区,为了提供系统吞吐量,在物理上每个Topic会分为一个或多个Partition

    Kafka架构图

    一个典型的Kafka集群中包含若干Producer,若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。

    Kafka通过Zookeeper管理集群配置及服务协同,Producer使用push模式将消息发布到broker,Consumer通过监听使用pull模式从broker订阅并消费消息。

    图上有个细节需要注意,producer给broker的过程是push,也就是有数据就推送给broker,而consumer给broker的过程是pull,是通过consumer主动去拉数据的,而不是broker把数据主动发送给consumer端的。

    Kafka与RabbitMQ比较

    • Kafka比RabbitMQ性能要高
    • RabbitMQ比Kafka可靠性要高
    • 因此在金融支付领域使用RabbitMQ居多,而在日志处理、大数据等方面Kafka使用居多。

    Kafka安装

    第一步 下载Kafka:

      地址 http://kafka.apache.org/downloads

    第二步 解压Kafka:

      tar -zxvf kafka.tgz -C  /usr/local/kafka

    第三步 运行Zookeeper:

       以后台方式运行 /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &  zookeeper端口 2181

    第四步 运行Kafka:

         以后台方式运行 /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties  kafka端口 9092

    Kafka图形管理工具

    http://www.kafkatool.com/download.html 

    Go语言中使用Kafka

    Sarama is an MIT-licensed Go client library for Apache Kafka version 0.8 (and later).

    安装sarama

      go get github.com/Shopify/sarama

    Producer

    package main
    
    import (
        "fmt"
        "github.com/Shopify/sarama"
    )
    
    func main() {
        // 新建一个arama配置实例
        config := sarama.NewConfig()
    
        // WaitForAll waits for all in-sync replicas to commit before responding.
        config.Producer.RequiredAcks = sarama.WaitForAll
    
        // NewRandomPartitioner returns a Partitioner which chooses a random partition each time.
        config.Producer.Partitioner = sarama.NewRandomPartitioner
    
        config.Producer.Return.Successes = true
    
        // 新建一个同步生产者
        client, err := sarama.NewSyncProducer([]string{"172.16.65.210:9092"}, config)
        if err != nil {
            fmt.Println("producer close, err:", err)
            return
        }
        defer client.Close()
    
        // 定义一个生产消息,包括Topic、消息内容、
        msg := &sarama.ProducerMessage{}
        msg.Topic = "revolution"
        msg.Key = sarama.StringEncoder("miles")
        msg.Value = sarama.StringEncoder("hello world...")
    
        // 发送消息
        pid, offset, err := client.SendMessage(msg)
    
    
        msg2 := &sarama.ProducerMessage{}
        msg2.Topic = "revolution"
        msg2.Key = sarama.StringEncoder("monroe")
        msg2.Value = sarama.StringEncoder("hello world2...")
        pid2, offset2, err := client.SendMessage(msg2)
    
    
        if err != nil {
            fmt.Println("send message failed,", err)
            return
        }
        fmt.Printf("pid:%v offset:%v
    ", pid, offset)
        fmt.Printf("pid2:%v offset2:%v
    ", pid2, offset2)
    }

    Consumer

    package main
    
    import (
        "sync"
        "github.com/Shopify/sarama"
        "fmt"
    )
    
    var wg sync.WaitGroup
    
    func main() {
        consumer, err := sarama.NewConsumer([]string{"172.16.65.210:9092"}, nil)
        if err != nil {
            fmt.Println("consumer connect error:", err)
            return
        }
        fmt.Println("connnect success...")
        defer consumer.Close()
        partitions, err := consumer.Partitions("revolution")
        if err != nil {
            fmt.Println("geet partitions failed, err:", err)
            return
        }
    
        for _, p := range partitions {
            partitionConsumer, err := consumer.ConsumePartition("revolution", p, sarama.OffsetOldest)
            if err != nil {
                fmt.Println("partitionConsumer err:", err)
                continue
            }
            wg.Add(1)
            go func(){
                for m := range partitionConsumer.Messages() {
                    fmt.Printf("key: %s, text: %s, offset: %d
    ", string(m.Key), string(m.Value), m.Offset)
                }
                wg.Done()
            }()
        }
        wg.Wait()
    }

     

  • 相关阅读:
    python-Mitmproxy抓包
    pytest-html、cov、xdist
    python-unittest添加用例的几种方式
    python-*args、**kargs用法
    One,Two,Three,Ak模板
    栈和队列小练习
    区块链入门介绍笔记
    Research on Facebook and Social Graph
    线段树板子的小修改
    htaccess远古时期技术了解一下
  • 原文地址:https://www.cnblogs.com/vincenshen/p/9824486.html
Copyright © 2011-2022 走看看