zoukankan      html  css  js  c++  java
  • Kafka安装Linux版

     

    1.   准备

    1.1    Kafka

    Kafka版本采用0.10.2.1,下载0.10.2.1文件

    http://kafka.apache.org/downloads

    1.2    JDK

    JDK选用Java1.8,下载地址

    http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

    1.3    Zookeeper

    Zookeeper选择3.4.9,下载地址:

    https://archive.apache.org/dist/zookeeper/

    2.   安装JDK

    Linux-Center-OS默认已经安装Java8,可以打开命令行:

    Java –version,查看当前的Linux安装Java版本。

    如果机器没有安装Java,采用一下步骤执行安装:

    1、 打开/etc/profile

    a)       vi /etc/profile

    2、 添加一下配置

    export JAVA_HOME=/home/teld/jdk1.8.0_65
    export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
    export PATH=$JAVA_HOME/bin:$PATH

    3、 添加文件夹权限

    a)       chmod -R +x jdk1.8.0_65

    4、 执行命令 source

    a)       source /etc/profile

    3.   安装Zookeeper

    本次将在本机上安装具有三个Zookeeper实例的Zookeeper集群

    实例

    对外端口

    内部选举端口

    1

    192.169.1.147:2181

    192.169.1.147:2881:3881

    2

    192.169.1.147:2182

    192.169.1.147:2882:3882

    3

    192.169.1.147:2183

    192.169.1.147:2883:3883

    1. 解压Zookeeper到指定目录: tar –zxvf   /home/zookeeper-3.4.9.tar.gz local/kafka
    2. 将Zookeeper3.4.9文件夹重命名为zookeeper-0,并复制文件夹zookeeper-1,zookeeper-2

    命令:mv zookeeper-3.4.9 zookeeper-1

              Cp –r zookeeper-1 zookeeper-2

              Cp –r zookeeper-1 zookeeper-3

    3.1    Zookeeper-1安装配置

    1. 进入到Zookeeper-1/cfg目录:cd /zookeeper/cfg

                复制文件:cp zoo_sample.cfg zoo.cfg

    1. 编辑zoo.cfg文件

    a)       Vi  zoo.cfg

    b)       定位到dataDir= /opt/kafka_0.10/zookeeper/data

    c)       文件最后添加

    server.1=192.169.1.147:2881:3881

    server.2=192.169.1.147:2882:3882

    server.3=192.169.1.147:2883:3883

    d)       Esc+:+wq+回车保存

    1. 创建Data文件目录

    a)       Cd ..

    b)       Mkdir data

    1. 创建MyID文件

    Vi myid,文件内容输入1,wq退出。

    1. 输入./zkServer.sh start ,启动zkServer服务器
    2. ./zkServer.sh status 命令查看Zookeeper的运行情况

    3.2    Zookeeper-2和Zookeepe-3文件实例配置

    与1配置雷同注意

    1、 coo.cfg中的端口号:2182、2183

    2、 coo.cfg中的dataDir:/usr/local/teldapp/zookeeper-0/data

    3、 data目录中的myid文件值不同:1、2、3

    4.   安装Kafka

    Kafka也在本机模拟三个节点:

    节点

    IP地址

    端口号

    listeners

    Kafka-01

    192.168.1.147

    9093

    listeners=PLAINTEXT://192.169.1.147:9093

    Kafka-02

    192.168.1.147

    9094

    listeners=PLAINTEXT://192.169.1.147:9094

    Kafka-03

    192.168.1.147

    9095

    listeners=PLAINTEXT://192.169.1.147:9095

    1、 解压Kafka安装包到指定目录

    2、 重命名Kafka_2.11-.10.2.1为kafka-01

    a)       Mv kafka_2.11-0.10.2.1 kafka-01

    3、 复制Kafka-01两份(不复制时根据启动实例配置文件启动三个实例)

    a)       Cp –r kafka-01 kafka-02

    b)       Cp –r kafka-01 kafka-03

    4.1    注意检查HostName映射

    为避免和AD配置上有重复解析, 确定hostname是否已经配置,已经连通性。

    4.2    Kafka-01安装

    1、 复制/confg/server.properties文件为server-9093.properties

    2、 打开server-9093.properties vi server9093.propertites,并修改一下配置值

    文件

    server-9093.properties

    broker.id=10

    log.dir= /tmp/mq/kafka/log

    listeners= PLAINTEXT://192.169.1.147:9093

    zookeeper.connect=192.169.1.147:2181,192.169.1.147:2182,192.169.1.147:2183

    3、 运行JMX-Port=9999 nohup bin\kafka-server-start.sh config\server-9093.properties &命令启动Kafka服务器

    4.3    Kafka02,03安装

    修改配置:server-9094.properties和server-9094.properties文件

    文件

    server-9094.properties

    broker.id=20

    log.dir=/usr/local/teldapp/mq/tmp/kafka-logs-9094

    listeners= PLAINTEXT://192.169.1.147:9094

    zookeeper.connect=192.169.1.147:2181,192.169.1.147:2182,192.169.1.147:2183

    server-9095.properties

    broker.id=30

    log.dir=/usr/local/teldapp/mq/tmp/kafka-logs-9095

    listeners= PLAINTEXT://192.169.1.147:9095

    zookeeper.connect=192.169.1.147:2181,192.169.1.147:2182,192.169.1.147:2183

    运行

    JMX-Port=9999 nohup bin\kafka-server-start.sh config\server-9094.properties &

    JMX-Port=9999 nohup bin\kafka-server-start.sh config\server-9095.properties &

    命令启动Kafka服务器

    4.4    Kafka服务器停止

    可以使用命令

    kafka-1/bin/kafka-server-stop.sh kafka-1/config/server-9093.properties

    kafka-1/bin/kafka-server-stop.sh kafka-1/config/server-9094.properties

    kafka-1/bin/kafka-server-stop.sh kafka-1/config/server-9095.properties

    来停止Kafka服务器

    4.5    Kafka-Manager安装

    1、 拷贝kafkamanager.tar.gz到linux服务器(1.3.3.15)

    2、 解压 tar –zxvf kafkamanger.tar.gz /usr/local/teldapp/mq

    3、 打开 conf目录,并kafka­-manager-zkhosts=进行配置

    4、 运行bin目录下start.sh

    5.   Kafka配置

    5.1    常用命令

    1. 启动kafka

    bin/kafka-server-start.sh server.properties

    1. 停止kafka

    bin/kafka-server-stop.sh server.properties

    1. 创建topic

    调用kafka-topics命令完成Topic的创建,复制因子为3,有3个分区

    bin/kafka-topics.sh --create --zookeeper 192.169.1.147:2181,192.169.1.147:2182,192.169.1.147:2183 --replication-factor 3 --partitions 3 --topic test

    1. 4.  查看Topic状态

    可以调用kafka-topics.sh –list来查看这个Topic的状态

    bin/kafka-topics.sh --list --zookeeper 192.169.1.147:2181,192.169.1.147:2182,192.169.1.147:2183

    1. 发送消息

    bin/kafka-console-producer.sh --broker-list 192.169.1.147:9093, 192.169.1.147:9094, 192.169.1.147:9095 --topic test

    this is a message

    this is another message

    1. 消费消息

    bin/kafka-console-consumer.sh --bootstrap-server 192.169.1.147:9093, 192.169.1.147:9094, 192.169.1.147:9095  --topic test --from-beginning

     

    5.2    Broker配置

    broker.id=10

    log.dirs=/usr/local/teldapp/mq/tmp/kafka-logs-9093

    listeners=PLAINTEXT://192.169.1.147:9093

    num.network.threads=3

    num.partitions=3

    num.replica.fetchers=3

    default.replication.factor=3

    5.3    Log日志文件配置

    log.cleanup.policy

    Delete

    log.cleaner.enable

    True

    log.retention.bytes

    1073741824=1TB

    日志数据存储的最大字节数。超过这个时间会根据policy处理数据。

    log.retention.hours

    168小时

    5.4    Java-jmx配置

    在启动Kafka之前,调用Jmx命令

    JMX_PORT=9997 nohup kafka-1/bin/kafka-server-start.sh kafka-1/config/server-9093.properties &

    JMX_PORT=9998 nohup kafka-1/bin/kafka-server-start.sh kafka-1/config/server-9094.properties &

    JMX_PORT=9999 nohup kafka-1/bin/kafka-server-start.sh kafka-1/config/server-9095.properties &

    nohup kafka-manager/bin/kafka-manager &

    6.   附录

    6.1    __consumer_offsets 文件很大

    6.2    修改linux 最大文件限制数 ulimit

    echo "* soft nofile 65535" >> /etc/security/limits.conf
    echo "* hard nofile 65535" >> /etc/security/limits.conf
    echo "* soft nproc 65535" >> /etc/security/limits.conf
    echo "* hard nproc 65535" >> /etc/security/limits.conf

    ulimit -n 65535

    sed -i 's/4096/65535/g' /etc/security/limits.d/20-nproc.conf

    6.3    Linux下开放端口

    使用iptables开放如下端口
    /sbin/iptables -I INPUT -p tcp --dport 9000 -j ACCEPT
    保存
    /etc/rc.d/init.d/iptables save
    重启服务
    service iptables restart
    查看需要打开的端口是否生效?
    /etc/init.d/iptables status、

    6.4    所有配置

    advertised.host.name = null

           advertised.listeners = null

           advertised.port = null

           authorizer.class.name =

           auto.create.topics.enable = true

           auto.leader.rebalance.enable = true

           background.threads = 10

           broker.id = 10

           broker.id.generation.enable = true

           broker.rack = null

           compression.type = producer

           connections.max.idle.ms = 600000

           controlled.shutdown.enable = true

           controlled.shutdown.max.retries = 3

           controlled.shutdown.retry.backoff.ms = 5000

           controller.socket.timeout.ms = 30000

           create.topic.policy.class.name = null

           default.replication.factor = 1

           delete.topic.enable = false

           fetch.purgatory.purge.interval.requests = 1000

           group.max.session.timeout.ms = 300000

           group.min.session.timeout.ms = 6000

           host.name =

           inter.broker.listener.name = null

           inter.broker.protocol.version = 0.10.2-IV0

           leader.imbalance.check.interval.seconds = 300

           leader.imbalance.per.broker.percentage = 10

           listener.security.protocol.map = SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,TRACE:TRACE,SASL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT

           listeners = 192.169.1.147:9093

           log.cleaner.backoff.ms = 15000

           log.cleaner.dedupe.buffer.size = 134217728

           log.cleaner.delete.retention.ms = 86400000

           log.cleaner.enable = true

           log.cleaner.io.buffer.load.factor = 0.9

           log.cleaner.io.buffer.size = 524288

           log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308

           log.cleaner.min.cleanable.ratio = 0.5

           log.cleaner.min.compaction.lag.ms = 0

           log.cleaner.threads = 1

           log.cleanup.policy = [delete]

           log.dir = /tmp/kafka-logs

           log.dirs = /usr/local/teldapp/mq/tmp/kafka-logs-9093

           log.flush.interval.messages = 9223372036854775807

           log.flush.interval.ms = null

           log.flush.offset.checkpoint.interval.ms = 60000

           log.flush.scheduler.interval.ms = 9223372036854775807

           log.index.interval.bytes = 4096

           log.index.size.max.bytes = 10485760

           log.message.format.version = 0.10.2-IV0

           log.message.timestamp.difference.max.ms = 9223372036854775807

           log.message.timestamp.type = CreateTime

           log.preallocate = false

           log.retention.bytes = -1

           log.retention.check.interval.ms = 300000

           log.retention.hours = 168

           log.retention.minutes = null

           log.retention.ms = null

           log.roll.hours = 168

           log.roll.jitter.hours = 0

           log.roll.jitter.ms = null

           log.roll.ms = null

           log.segment.bytes = 1073741824

           log.segment.delete.delay.ms = 60000

           max.connections.per.ip = 2147483647

           max.connections.per.ip.overrides =

           message.max.bytes = 1000012

           metric.reporters = []

           metrics.num.samples = 2

           metrics.recording.level = INFO

           metrics.sample.window.ms = 30000

           min.insync.replicas = 1

           num.io.threads = 8

           num.network.threads = 3

           num.partitions = 3

           num.recovery.threads.per.data.dir = 1

           num.replica.fetchers = 1

           offset.metadata.max.bytes = 4096

           offsets.commit.required.acks = -1

           offsets.commit.timeout.ms = 5000

           offsets.load.buffer.size = 5242880

           offsets.retention.check.interval.ms = 600000

           offsets.retention.minutes = 1440

           offsets.topic.compression.codec = 0

           offsets.topic.num.partitions = 50

           offsets.topic.replication.factor = 3

           offsets.topic.segment.bytes = 104857600

           port = 9092

           principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder

           producer.purgatory.purge.interval.requests = 1000

           queued.max.requests = 500

           quota.consumer.default = 9223372036854775807

           quota.producer.default = 9223372036854775807

           quota.window.num = 11

           quota.window.size.seconds = 1

           replica.fetch.backoff.ms = 1000

           replica.fetch.max.bytes = 1048576

           replica.fetch.min.bytes = 1

           replica.fetch.response.max.bytes = 10485760

           replica.fetch.wait.max.ms = 500

           replica.high.watermark.checkpoint.interval.ms = 5000

           replica.lag.time.max.ms = 10000

           replica.socket.receive.buffer.bytes = 65536

           replica.socket.timeout.ms = 30000

           replication.quota.window.num = 11

           replication.quota.window.size.seconds = 1

           request.timeout.ms = 30000

           reserved.broker.max.id = 1000

           sasl.enabled.mechanisms = [GSSAPI]

           sasl.kerberos.kinit.cmd = /usr/bin/kinit

           sasl.kerberos.min.time.before.relogin = 60000

           sasl.kerberos.principal.to.local.rules = [DEFAULT]

           sasl.kerberos.service.name = null

           sasl.kerberos.ticket.renew.jitter = 0.05

           sasl.kerberos.ticket.renew.window.factor = 0.8

           sasl.mechanism.inter.broker.protocol = GSSAPI

           security.inter.broker.protocol = PLAINTEXT

           socket.receive.buffer.bytes = 102400

           socket.request.max.bytes = 104857600

           socket.send.buffer.bytes = 102400

           ssl.cipher.suites = null

           ssl.client.auth = none

           ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]

           ssl.endpoint.identification.algorithm = null

           ssl.key.password = null

           ssl.keymanager.algorithm = SunX509

           ssl.keystore.location = null

           ssl.keystore.password = null

           ssl.keystore.type = JKS

           ssl.protocol = TLS

           ssl.provider = null

           ssl.secure.random.implementation = null

           ssl.trustmanager.algorithm = PKIX

           ssl.truststore.location = null

           ssl.truststore.password = null

           ssl.truststore.type = JKS

           unclean.leader.election.enable = true

           zookeeper.connect = 192.169.1.147:2181,192.169.1.147:2182,192.169.1.147:2183

           zookeeper.connection.timeout.ms = 6000

           zookeeper.session.timeout.ms = 6000

           zookeeper.set.acl = false

           zookeeper.sync.time.ms = 2000

  • 相关阅读:
    类的无参方法
    类和对象的案例
    类和对象的实例
    类和对象
    【Java】【1】String数组和List相互转换
    【Oracle】【5】去掉字符串最后一个特殊字符
    【其他】【Restful】【1】简单了解Restful概念
    【IDEA】【7】Git更新及提交
    【IDEA】【6】Maven打包
    【IDEA】【5】快捷键
  • 原文地址:https://www.cnblogs.com/likethis/p/9454151.html
Copyright © 2011-2022 走看看