zoukankan      html  css  js  c++  java
  • Kafka系列一 基本安装

    一 配置文件(下载、解压、跳过)

      1 # Licensed to the Apache Software Foundation (ASF) under one or more
      2 # contributor license agreements.  See the NOTICE file distributed with
      3 # this work for additional information regarding copyright ownership.
      4 # The ASF licenses this file to You under the Apache License, Version 2.0
      5 # (the "License"); you may not use this file except in compliance with
      6 # the License.  You may obtain a copy of the License at
      7 #
      8 #    http://www.apache.org/licenses/LICENSE-2.0
      9 #
     10 # Unless required by applicable law or agreed to in writing, software
     11 # distributed under the License is distributed on an "AS IS" BASIS,
     12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13 # See the License for the specific language governing permissions and
     14 # limitations under the License.
     15 
     16 # see kafka.server.KafkaConfig for additional details and defaults
     17 
     18 ############################# Server Basics #############################
     19 
     20 # The id of the broker. This must be set to a unique integer for each broker.
     21 #Broker的全局唯一编号,不能重复
     22 broker.id=0
     23 
     24 ############################# Socket Server Settings #############################
     25 
     26 # The address the socket server listens on. It will get the value returned from 
     27 # java.net.InetAddress.getCanonicalHostName() if not configured.
     28 #   FORMAT:
     29 #     listeners = listener_name://host_name:port
     30 #   EXAMPLE:
     31 #     listeners = PLAINTEXT://your.host.name:9092
     32 #listeners=PLAINTEXT://:9092
     33 
     34 # Hostname and port the broker will advertise to producers and consumers. If not set, 
     35 # it uses the value for "listeners" if configured.  Otherwise, it will use the value
     36 # returned from java.net.InetAddress.getCanonicalHostName().
     37 #advertised.listeners=PLAINTEXT://your.host.name:9092
     38 
     39 # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
     40 #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
     41 
     42 # The number of threads that the server uses for receiving requests from the network and sending responses to the network
     43 #处理网络请求的线程数量
     44 num.network.threads=3
     45 
     46 # The number of threads that the server uses for processing requests, which may include disk I/O
     47 #用来处理磁盘IO的线程数量
     48 num.io.threads=8
     49 
     50 # The send buffer (SO_SNDBUF) used by the socket server
     51 #发送套接字的缓冲区大小
     52 socket.send.buffer.bytes=102400
     53 
     54 # The receive buffer (SO_RCVBUF) used by the socket server
     55 #接收套接字的缓冲区大小
     56 socket.receive.buffer.bytes=102400
     57 
     58 # The maximum size of a request that the socket server will accept (protection against OOM)
     59 #请求套接字的缓冲区大小
     60 socket.request.max.bytes=104857600
     61 
     62 
     63 ############################# Log Basics #############################
     64 
     65 # A comma seperated list of directories under which to store log files
     66 #运行日志存放路径
     67 log.dirs=/home/hadoop/logs/kafka
     68 
     69 # The default number of log partitions per topic. More partitions allow greater
     70 # parallelism for consumption, but this will also result in more files across
     71 # the brokers.
     72 #topic 的分片个数
     73 num.partitions=3
     74 
     75 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
     76 # This value is recommended to be increased for installations with data dirs located in RAID array.
     77 #用来恢复和清理Data下数据的线程数量
     78 num.recovery.threads.per.data.dir=1
     79 
     80 ############################# Internal Topic Settings  #############################
     81 # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
     82 # For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
     83 offsets.topic.replication.factor=1
     84 transaction.state.log.replication.factor=1
     85 transaction.state.log.min.isr=1
     86 
     87 ############################# Log Flush Policy #############################
     88 
     89 # Messages are immediately written to the filesystem but by default we only fsync() to sync
     90 # the OS cache lazily. The following configurations control the flush of data to disk.
     91 # There are a few important trade-offs here:
     92 #    1. Durability: Unflushed data may be lost if you are not using replication.
     93 #    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
     94 #    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
     95 # The settings below allow one to configure the flush policy to flush data after a period of time or
     96 # every N messages (or both). This can be done globally and overridden on a per-topic basis.
     97 
     98 # The number of messages to accept before forcing a flush of data to disk
     99 #log.flush.interval.messages=10000
    100 
    101 # The maximum amount of time a message can sit in a log before we force a flush
    102 #log.flush.interval.ms=1000
    103 
    104 ############################# Log Retention Policy #############################
    105 
    106 # The following configurations control the disposal of log segments. The policy can
    107 # be set to delete segments after a period of time, or after a given size has accumulated.
    108 # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
    109 # from the end of the log.
    110 
    111 # The minimum age of a log file to be eligible for deletion due to age
    112 #segment文件保留的最长时间,超时将被删除
    113 log.retention.hours=168
    114 
    115 # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
    116 # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
    117 #log.retention.bytes=1073741824
    118 
    119 # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    120 log.segment.bytes=1073741824
    121 
    122 # The interval at which log segments are checked to see if they can be deleted according
    123 # to the retention policies
    124 log.retention.check.interval.ms=300000
    125 
    126 ############################# Zookeeper #############################
    127 
    128 # Zookeeper connection string (see zookeeper docs for details).
    129 # This is a comma separated host:port pairs, each corresponding to a zk
    130 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    131 # You can also append an optional chroot string to the urls to specify the
    132 # root directory for all kafka znodes.
    133 # zk地址
    134 zookeeper.connect=hadoop2:2181,hadoop3:2181,hadoop4:2181
    135 
    136 # Timeout in ms for connecting to zookeeper
    137 zookeeper.connection.timeout.ms=6000
    138 
    139 
    140 ############################# Group Coordinator Settings #############################
    141 
    142 # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
    143 # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
    144 # The default value for this is 3 seconds.
    145 # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
    146 # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
    147 group.initial.rebalance.delay.ms=0
    server.properties

    二 启动集群

      分发安装包,修改每个配置文件的broker.id ,不得重复

      启动zookeeper集群(hadoop2,hadoop3,hadoop4)

      依次在各节点上启动kafka

    bin/kafka-server-start.sh config/server.properties

    三 常用操作命令

      1 查看当前服务器中所有的topic

    bin/kafka-topics.sh --list --zookeeper hadoop2:2181

       2 创建topic

    bin/kafka-topics.sh --create --zookeeper hadoop2:2181 --replication-factor 1 --partitions 3 --topic first

       --replication-factor 1 副本个数

       --partitions 3 分片个数

       --topic first 主题名字

       3 删除topic

    bin/kafka-topics.sh --delete --zookeeper hadoop2:2181 --topic first

       需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启

      4 通过shell命令发送消息

    bin/kafka-console-producer.sh --broker-list kafka1:9092 --topic first

       5 通过shell命令消费消息

    bin/kafka-console-consumer.sh --zookeeper hadoop2:2181 --from-beginning --topic first

       6 查看消费者位置

    bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper hadoop2:2181 --group testGroup

       7 查看某个Topic详情

    bin/kafka-topics.sh --topic first--describe --zookeeper hadoop2:2181
  • 相关阅读:
    LeetCode120 Triangle
    LeetCode119 Pascal's Triangle II
    LeetCode118 Pascal's Triangle
    LeetCode115 Distinct Subsequences
    LeetCode114 Flatten Binary Tree to Linked List
    LeetCode113 Path Sum II
    LeetCode112 Path Sum
    LeetCode111 Minimum Depth of Binary Tree
    Windows下搭建PHP开发环境-WEB服务器
    如何发布可用于azure的镜像文件
  • 原文地址:https://www.cnblogs.com/zhaobingqing/p/8560510.html
Copyright © 2011-2022 走看看