zoukankan      html  css  js  c++  java
  • windows下golang实现Kfaka消息发送及kafka环境搭建

      kafka环境搭建:

        一、安装配置java-jdk

        (1)kafka需要java环境,安装java-jdk,下载地址:https://www.oracle.com/technetwork/java/javase/downloads/index.html

       (2)安装目录如下:

        (3)环境变量配置:

        

         

        二、下载kafka

        (1)下载kafka2.10-0.9.0.1版本,自带了zookeeper jar包,不用再次下载zookeeper。kafka代理无状态,zookeeper维持集群状态。下载地址:http://mirror.bit.edu.cn/apache/kafka/2.2.0/kafka-2.2.0-src.tgz

        (2)安装目录(不要带空格)如下:

        

        (3)修改zookeeper和kafka配置文件:

        (4)按照配置创建这两个目录

        (5)cmd启动zookeeper:  

    cd D:KAFKAkafka_2.10-0.9.0.1kafka_2.10-0.9.0.1kafka_2.10-0.9.0.1
    
    binwindowszookeeper-server-start.bat configzookeeper.properties

        (6)再开cmd启动kafka:

    cd D:KAFKAkafka_2.10-0.9.0.1kafka_2.10-0.9.0.1kafka_2.10-0.9.0.1
    
    binwindowskafka-server-start.bat configserver.properties

        (7)再开cmd创建topic发送消息:

    cd D:KAFKAkafka_2.10-0.9.0.1kafka_2.10-0.9.0.1kafka_2.10-0.9.0.1
    
    # 创建topic
    binwindowskafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kjTest
    
    # 列出topic
    binwindowskafka-topics.bat --list --zookeeper localhost:2181
    
    # 创建生产者
    binwindowskafka-console-producer.bat --broker-list localhost:9092 --topic kjTest
    
    # 发送消息
    this is a test
    hello

        (8)再开cmd接收消息:

    cd D:KAFKAkafka_2.10-0.9.0.1kafka_2.10-0.9.0.1kafka_2.10-0.9.0.1
    
    # 创建消费者
    binwindowskafka-console-consumer.bat --zookeeper localhost:2181 --topic kjTest --from-beginning
    
    # 消费
    this is a test
    hello


       golang实现Kfaka消息发送:

          创建main.go:

    package main
    
    import (
    	"fmt"
    
    	"github.com/Shopify/sarama"
    	"time"
    )
    
    //消息写入kafka
    func main() {
    	//初始化配置
    	config := sarama.NewConfig()
    	config.Producer.RequiredAcks = sarama.WaitForAll
    	config.Producer.Partitioner = sarama.NewRandomPartitioner
    	config.Producer.Return.Successes = true
    	//生产者
    	client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
    	if err != nil {
    		fmt.Println("producer close,err:", err)
    		return
    	}
    
    	defer client.Close()
    	var n int = 0
    
    	for n < 20 {
    		n++
    		//创建消息
    		msg := &sarama.ProducerMessage{}
    		msg.Topic = "kjTest"
    		msg.Value = sarama.StringEncoder("this is a good test,hello nola!")
    		//发送消息
    		pid, offset, err := client.SendMessage(msg)
    		if err != nil {
    			fmt.Println("send message failed,", err)
    			return
    		}
    		fmt.Printf("pid:%v offset:%v
    ,", pid, offset)
    		time.Sleep(10 * time.Millisecond)
    
    	}
    
    }
    

          消费消息效果:

      参考博友:

        kafka环境搭建:https://www.cnblogs.com/UniqueColor/p/8657319.html

        golang发送消息到kafka:https://www.cnblogs.com/pyyu/p/8371649.html

        kafka入门,概念功能理解:https://blog.csdn.net/tflasd1157/article/details/81985722

  • 相关阅读:
    elastic-job 新手指南
    最基本的区块链hello world(python3实现)
    python:函数的高级特性
    python高级特性:切片/迭代/列表生成式/生成器
    python:函数中五花八门的参数形式(茴香豆的『回』字有四种写法)
    python:爬虫入门
    python: 序列化/反序列化及对象的深拷贝/浅拷贝
    python中的zip、lambda、map操作
    python面向对象笔记
    RxJava2学习笔记(3)
  • 原文地址:https://www.cnblogs.com/NolaLi/p/10768712.html
Copyright © 2011-2022 走看看