zoukankan      html  css  js  c++  java
  • 单机配置kafka和zookeeper

    1:环境准备

    jdk 推荐oracle,不建议open sdk

    在/etc/profile加入下列环境变量

    在PATH中将jdk和jre的bin加入path里面

    $JAVA_HOME/bin:$JRE_HOME/bin

    2:安装zookeeper

    下载zookeeper tar

    url: https://archive.apache.org/dist/zookeeper/zookeeper-3.4.5/zookeeper-3.4.5.tar.gz

    将压缩包移动到/usr/local下面

    tar -zxvf ***

    更改配置文件

    (1)将conf/zoo_sample.cfg更改为zoo.cfg

    (2)更改配置如下

    注意:

      *其中默认port为2181。

      *datadir需手动创建 mkdir -p datadir

           *注释掉的参数在单机中无用

    (3)加入环境变量 /etc/profile

    测试:

    可以cd 到bin目录下通过执行 zkServer.sh shell脚本启动、停止或者查看状态

    eg:

    ./zkServer.sh start/stop/status

     

    3:安装kafka

    先下载tar包、解压、mv到/usr/local下面

    wget https://www.apache.org/dyn/closer.cgi?path=/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz
    sudo mv *** /usr/local
    cd /usr/local
    sudo tar -zxvf ****
    sudo rm **.tar.gz

    修改config目录下配置文件

    vim server.properties

    修改如下参数

    #broker.id需改成正整数,单机为1就好
    broker.id=1
    #指定端口号
    port=9092
    #localhost这一项还有其他要修改,详细见下面说明
    host.name=localhost
    #指定kafka的日志目录
    log.dirs=/usr/local/kafka_2.11-0.11.0.0/kafka-logs
    #连接zookeeper配置项,这里指定的是单机,所以只需要配置localhost,若是实际生产环境,需要在这里添加其他ip地址和端口号
    zookeeper.connect=localhost:2181

    vim zookeeper.properties

    修改如下参数

    #数据目录
    dataDir=/usr/local/kafka_2.11-0.11.0.0/zookeeper/data
    #客户端端口
    clientPort=2181
    host.name=localhost

    producer.properties and consumer.properties

    zookeeper.connect=localhost:2181

    4:启动kafka

    bin/zookeeper-server-start.sh config/zookeeper.properties
    bin/kafka-server-start.sh config/server.properties

    终端都是info log

     

    注意:如果是非root用户,可能会使用到sudo去启动zookeeper和kafka,但是那样会失败必须将kafka整个目录下的文件的owner和group都改为自己的用户。

    可以通过jps命令查看两者是否启动成功

    2576 QuorumPeerMain表示zookeeper

    如果发现没启动成功,可以在zookeeper/config/zookeeper.out里debug。

     

    成功启动后,可以简单通过demo进行测试

    github有各种语言的kafka支持

    url:https://github.com/edenhill/librdkafka/tree/v0.9.5

    可以简单通过go的demo进行测试

    consumer

    import (
        "fmt"
        "github.com/confluentinc/confluent-kafka-go/kafka"
    )
    
    func main() {
    
        c, err := kafka.NewConsumer(&kafka.ConfigMap{
            "bootstrap.servers": "localhost",
            "group.id":          "myGroup",
            "auto.offset.reset": "earliest",
        })
    
        if err != nil {
            panic(err)
        }
    
        c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)
    
        for {
            msg, err := c.ReadMessage(-1)
            if err == nil {
                fmt.Printf("Message on %s: %s
    ", msg.TopicPartition, string(msg.Value))
            } else {
                fmt.Printf("Consumer error: %v (%v)
    ", err, msg)
                break
            }
        }
    
        c.Close()
    }

    producer

    import (
        "fmt"
        "github.com/confluentinc/confluent-kafka-go/kafka"
    )
    
    func main() {
    
        p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
        if err != nil {
            panic(err)
        }
    
      
        go func() {
            for e := range p.Events() {
                switch ev := e.(type) {
                case *kafka.Message:
                    if ev.TopicPartition.Error != nil {
                        fmt.Printf("Delivery failed: %v
    ", ev.TopicPartition)
                    } else {
                        fmt.Printf("Delivered message to %v
    ", ev.TopicPartition)
                    }
                }
            }
        }()
    
     
        topic := "myTopic"
        for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
            p.Produce(&kafka.Message{
                TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
                Value:          []byte(word),
            }, nil)
        }
    
    
        p.Flush(15 * 1000)
    }
  • 相关阅读:
    C# SqlTransaction事务,先从后主
    去除HTML标记
    GIT拉取问题
    QQ音乐API
    解决UEditor编辑器禁用时点击文本编辑器会多加一个字符问题
    UEditor编辑器增加placeholder提示
    C# List去重及优化建议
    ref和out解析
    时间标准格式转换及数值的ToString的格式化
    没有被“怼”,顺利通过华为Android三面,看看面试官都问了我什么?
  • 原文地址:https://www.cnblogs.com/13224ACMer/p/9082457.html
Copyright © 2011-2022 走看看