zoukankan      html  css  js  c++  java
  • centos单机安装zookeeper+kafaka

    环境如下:

    CentOS-7-x86_64
    zookeeper-3.4.11
    kafka_2.12-1.1.0

    一.zookeeper下载与安装

    1)下载zookeeper

    [root@localhost opt]# cd /opt/
    [root@localhost opt]# wget https://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz

    2)解压

    [root@localhost opt]# tar zxvf zookeeper-3.4.11.tar.gz 
    [root@localhost opt]# ls
    zookeeper-3.4.11 zookeeper-3.4.11.tar.gz

    3)配置

    [root@localhost opt]# cd zookeeper-3.4.11
    [root@localhost zookeeper-3.4.11]# ll
    total 1596
    drwxr-xr-x. 2 502 games 149 Nov 1 14:52 bin
    -rw-r--r--. 1 502 games 87943 Nov 1 14:47 build.xml
    drwxr-xr-x. 2 502 games 77 Nov 1 14:52 conf
    drwxr-xr-x. 10 502 games 130 Nov 1 14:47 contrib
    drwxr-xr-x. 2 502 games 4096 Nov 1 14:54 dist-maven
    drwxr-xr-x. 6 502 games 4096 Nov 1 14:52 docs
    -rw-r--r--. 1 502 games 1709 Nov 1 14:47 ivysettings.xml
    -rw-r--r--. 1 502 games 8197 Nov 1 14:47 ivy.xml
    drwxr-xr-x. 4 502 games 4096 Nov 1 14:52 lib
    -rw-r--r--. 1 502 games 11938 Nov 1 14:47 LICENSE.txt
    -rw-r--r--. 1 502 games 3132 Nov 1 14:47 NOTICE.txt
    -rw-r--r--. 1 502 games 1585 Nov 1 14:47 README.md
    -rw-r--r--. 1 502 games 1770 Nov 1 14:47 README_packaging.txt
    drwxr-xr-x. 5 502 games 47 Nov 1 14:47 recipes
    drwxr-xr-x. 8 502 games 211 Nov 1 14:52 src
    -rw-r--r--. 1 502 games 1478279 Nov 1 14:49 zookeeper-3.4.11.jar
    -rw-r--r--. 1 502 games 195 Nov 1 14:52 zookeeper-3.4.11.jar.asc
    -rw-r--r--. 1 502 games 33 Nov 1 14:49 zookeeper-3.4.11.jar.md5
    -rw-r--r--. 1 502 games 41 Nov 1 14:49 zookeeper-3.4.11.jar.sha1
    [root@localhost zookeeper-3.4.11]# cp -rf conf/zoo_sample.cfg conf/zoo.cfg
    [root@localhost zookeeper-3.4.11]# vi conf/zoo.cfg

    修改或添加zoo.cfg文件中如下两个配置项:

    dataDir=/opt/zookeeper-3.4.11/zkdata #这个目录是预先创建的
    dataLogDir=/opt/zookeeper-3.4.11/zkdatalog #这个目录是预先创建的

    创建zk数据存储和zk日志存储目录:

    [root@localhost zookeeper-3.4.11]# mkdir /opt/zookeeper-3.4.11/zkdata
    [root@localhost zookeeper-3.4.11]# mkdir /opt/zookeeper-3.4.11/zkdatalog
    [root@localhost zookeeper-3.4.11]# ll
    total 1596
    drwxr-xr-x. 2 502 games 149 Nov 1 14:52 bin
    -rw-r--r--. 1 502 games 87943 Nov 1 14:47 build.xml
    drwxr-xr-x. 2 502 games 92 Mar 31 11:12 conf
    drwxr-xr-x. 10 502 games 130 Nov 1 14:47 contrib
    drwxr-xr-x. 2 502 games 4096 Nov 1 14:54 dist-maven
    drwxr-xr-x. 6 502 games 4096 Nov 1 14:52 docs
    -rw-r--r--. 1 502 games 1709 Nov 1 14:47 ivysettings.xml
    -rw-r--r--. 1 502 games 8197 Nov 1 14:47 ivy.xml
    drwxr-xr-x. 4 502 games 4096 Nov 1 14:52 lib
    -rw-r--r--. 1 502 games 11938 Nov 1 14:47 LICENSE.txt
    -rw-r--r--. 1 502 games 3132 Nov 1 14:47 NOTICE.txt
    -rw-r--r--. 1 502 games 1585 Nov 1 14:47 README.md
    -rw-r--r--. 1 502 games 1770 Nov 1 14:47 README_packaging.txt
    drwxr-xr-x. 5 502 games 47 Nov 1 14:47 recipes
    drwxr-xr-x. 8 502 games 211 Nov 1 14:52 src
    drwxr-xr-x. 2 root root 6 Mar 31 11:13 zkdata
    drwxr-xr-x. 2 root root 6 Mar 31 11:13 zkdatalog
    -rw-r--r--. 1 502 games 1478279 Nov 1 14:49 zookeeper-3.4.11.jar
    -rw-r--r--. 1 502 games 195 Nov 1 14:52 zookeeper-3.4.11.jar.asc
    -rw-r--r--. 1 502 games 33 Nov 1 14:49 zookeeper-3.4.11.jar.md5
    -rw-r--r--. 1 502 games 41 Nov 1 14:49 zookeeper-3.4.11.jar.sha1

    4)配置环境变量

    [root@localhost zookeeper-3.4.11]# vi /etc/profile

    配置项如下:

    # config java class path 
    export JAVA_HOME=/usr/local/java/jdk1.8.0_161
    export JRE_HOME=${JAVA_HOME}/jre
    export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib:$ZOOKEEPER_HOME/lib:
    export PATH=${JAVA_HOME}/bin:$ZOOKEEPER_HOME/bin:$PATH
    
    # config zookeeper install path
    export ZOOKEEPER_HOME=/opt/zookeeper-3.4.11

    5)启动zookeeper

    [root@localhost bin]# cd /opt/zookeeper-3.4.11/bin
    [root@localhost bin]# ll
    total 36
    -rwxr-xr-x. 1 502 games 232 Nov 1 14:47 README.txt
    -rwxr-xr-x. 1 502 games 1937 Nov 1 14:47 zkCleanup.sh
    -rwxr-xr-x. 1 502 games 1056 Nov 1 14:47 zkCli.cmd
    -rwxr-xr-x. 1 502 games 1534 Nov 1 14:47 zkCli.sh
    -rwxr-xr-x. 1 502 games 1628 Nov 1 14:47 zkEnv.cmd
    -rwxr-xr-x. 1 502 games 2696 Nov 1 14:47 zkEnv.sh
    -rwxr-xr-x. 1 502 games 1089 Nov 1 14:47 zkServer.cmd
    -rwxr-xr-x. 1 502 games 6773 Nov 1 14:47 zkServer.sh
    [root@localhost bin]# ./zkServer.sh start
    ZooKeeper JMX enabled by default
    Using config: /opt/zookeeper-3.4.11/bin/../conf/zoo.cfg
    Starting zookeeper ... STARTED

    二.kafka下载与安装

    1).下载kafka:

    [root@localhost bin]# cd /opt/
    [root@localhost opt]# wget http://apache.fayea.com/kafka/1.1.0/kafka_2.12-1.1.0.tgz
    --2018-03-31 11:21:52-- http://apache.fayea.com/kafka/1.1.0/kafka_2.12-1.1.0.tgz
    Resolving apache.fayea.com (apache.fayea.com)... 202.115.175.188, 202.115.175.187
    Connecting to apache.fayea.com (apache.fayea.com)|202.115.175.188|:80... connected.
    HTTP request sent, awaiting response... 200 OK
    Length: 50326212 (48M) [application/x-gzip]
    Saving to: ‘kafka_2.12-1.1.0.tgz’
    
    100%[=============================================================================================================================>] 50,326,212 442KB/s in 1m 44s
    
    2018-03-31 11:23:36 (473 KB/s) - ‘kafka_2.12-1.1.0.tgz’ saved [50326212/50326212]
    
    [root@localhost opt]# ll
    total 84964
    -rw-r--r--. 1 root root 50326212 Mar 28 08:05 kafka_2.12-1.1.0.tgz
    drwxr-xr-x. 15 502 games 4096 Mar 31 11:20 zookeeper-3.4.11
    -rw-r--r--. 1 root root 36668066 Nov 8 13:24 zookeeper-3.4.11.tar.gz

    2) 解压:

    tar -zxvf kafka_2.12-1.1.0.tgz

    3) 配置:

    进入kafka安装工程根目录编辑config/server.properties

    [root@localhost opt]# cd /opt/kafka_2.12-1.1.0/config/
    [root@localhost config]# ll
    total 64
    -rw-r--r--. 1 root root 906 Mar 23 18:51 connect-console-sink.properties
    -rw-r--r--. 1 root root 909 Mar 23 18:51 connect-console-source.properties
    -rw-r--r--. 1 root root 5807 Mar 23 18:51 connect-distributed.properties
    -rw-r--r--. 1 root root 883 Mar 23 18:51 connect-file-sink.properties
    -rw-r--r--. 1 root root 881 Mar 23 18:51 connect-file-source.properties
    -rw-r--r--. 1 root root 1111 Mar 23 18:51 connect-log4j.properties
    -rw-r--r--. 1 root root 2730 Mar 23 18:51 connect-standalone.properties
    -rw-r--r--. 1 root root 1221 Mar 23 18:51 consumer.properties
    -rw-r--r--. 1 root root 4727 Mar 23 18:51 log4j.properties
    -rw-r--r--. 1 root root 1919 Mar 23 18:51 producer.properties
    -rw-r--r--. 1 root root 6851 Mar 23 18:51 server.properties
    -rw-r--r--. 1 root root 1032 Mar 23 18:51 tools-log4j.properties
    -rw-r--r--. 1 root root 1023 Mar 23 18:51 zookeeper.properties
    [root@localhost config]# mkdir /opt/kafka_2.12-1.1.0/kafka_log

    添加或者修改以下两个配置项:

    log.dirs=/opt/kafka_2.12-1.1.0/kafka_log      #(提前创建)
    listeners=PLAINTEXT://192.168.0.111:9092

    config/server.properties修改后:

      1 [root@localhost config]# more server.properties 
      2 # Licensed to the Apache Software Foundation (ASF) under one or more
      3 # contributor license agreements. See the NOTICE file distributed with
      4 # this work for additional information regarding copyright ownership.
      5 # The ASF licenses this file to You under the Apache License, Version 2.0
      6 # (the "License"); you may not use this file except in compliance with
      7 # the License. You may obtain a copy of the License at
      8 #
      9 # http://www.apache.org/licenses/LICENSE-2.0
     10 #
     11 # Unless required by applicable law or agreed to in writing, software
     12 # distributed under the License is distributed on an "AS IS" BASIS,
     13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14 # See the License for the specific language governing permissions and
     15 # limitations under the License.
     16 
     17 # see kafka.server.KafkaConfig for additional details and defaults
     18 
     19 ############################# Server Basics #############################
     20 
     21 # The id of the broker. This must be set to a unique integer for each broker.
     22 broker.id=0
     23 
     24 ############################# Socket Server Settings #############################
     25 
     26 # The address the socket server listens on. It will get the value returned from 
     27 # java.net.InetAddress.getCanonicalHostName() if not configured.
     28 # FORMAT:
     29 # listeners = listener_name://host_name:port
     30 # EXAMPLE:
     31 # listeners = PLAINTEXT://your.host.name:9092
     32 #listeners=PLAINTEXT://:9092
     33 listeners=PLAINTEXT://192.178.0.111:9092
     34 
     35 # Hostname and port the broker will advertise to producers and consumers. If not set, 
     36 # it uses the value for "listeners" if configured. Otherwise, it will use the value
     37 # returned from java.net.InetAddress.getCanonicalHostName().
     38 #advertised.listeners=PLAINTEXT://your.host.name:9092
     39 
     40 # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
     41 #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
     42 
     43 # The number of threads that the server uses for receiving requests from the network and sending responses to the network
     44 num.network.threads=3
     45 
     46 # The number of threads that the server uses for processing requests, which may include disk I/O
     47 num.io.threads=8
     48 
     49 # The send buffer (SO_SNDBUF) used by the socket server
     50 socket.send.buffer.bytes=102400
     51 
     52 # The receive buffer (SO_RCVBUF) used by the socket server
     53 socket.receive.buffer.bytes=102400
     54 
     55 # The maximum size of a request that the socket server will accept (protection against OOM)
     56 socket.request.max.bytes=104857600
     57 
     58 
     59 ############################# Log Basics #############################
     60 
     61 # A comma separated list of directories under which to store log files
     62 #log.dirs=/tmp/kafka-logs
     63 log.dirs=/opt/kafka_2.12-1.1.0/kafka_log
     64 
     65 # The default number of log partitions per topic. More partitions allow greater
     66 # parallelism for consumption, but this will also result in more files across
     67 # the brokers.
     68 num.partitions=1
     69 
     70 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
     71 # This value is recommended to be increased for installations with data dirs located in RAID array.
     72 num.recovery.threads.per.data.dir=1
     73 
     74 ############################# Internal Topic Settings #############################
     75 # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
     76 # For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
     77 offsets.topic.replication.factor=1
     78 transaction.state.log.replication.factor=1
     79 transaction.state.log.min.isr=1
     80 
     81 ############################# Log Flush Policy #############################
     82 
     83 # Messages are immediately written to the filesystem but by default we only fsync() to sync
     84 # the OS cache lazily. The following configurations control the flush of data to disk.
     85 # There are a few important trade-offs here:
     86 # 1. Durability: Unflushed data may be lost if you are not using replication.
     87 # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
     88 # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
     89 # The settings below allow one to configure the flush policy to flush data after a period of time or
     90 # every N messages (or both). This can be done globally and overridden on a per-topic basis.
     91 
     92 # The number of messages to accept before forcing a flush of data to disk
     93 #log.flush.interval.messages=10000
     94 
     95 # The maximum amount of time a message can sit in a log before we force a flush
     96 #log.flush.interval.ms=1000
     97 
     98 ############################# Log Retention Policy #############################
     99 
    100 # The following configurations control the disposal of log segments. The policy can
    101 # be set to delete segments after a period of time, or after a given size has accumulated.
    102 # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
    103 # from the end of the log.
    104 
    105 # The minimum age of a log file to be eligible for deletion due to age
    106 log.retention.hours=168
    107 
    108 # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
    109 # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
    110 #log.retention.bytes=1073741824
    111 
    112 # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    113 log.segment.bytes=1073741824
    114 
    115 # The interval at which log segments are checked to see if they can be deleted according
    116 # to the retention policies
    117 log.retention.check.interval.ms=300000
    118 
    119 ############################# Zookeeper #############################
    120 
    121 # Zookeeper connection string (see zookeeper docs for details).
    122 # This is a comma separated host:port pairs, each corresponding to a zk
    123 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    124 # You can also append an optional chroot string to the urls to specify the
    125 # root directory for all kafka znodes.
    126 zookeeper.connect=localhost:2181
    127 
    128 # Timeout in ms for connecting to zookeeper
    129 zookeeper.connection.timeout.ms=6000
    130 
    131 
    132 ############################# Group Coordinator Settings #############################
    133 
    134 # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
    135 # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
    136 # The default value for this is 3 seconds.
    137 # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
    138 # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances dur
    139 ing application startup.
    140 group.initial.rebalance.delay.ms=0
    View Code

    4)启动kafka

    [root@localhost kafka_2.12-1.1.0]# cd /opt/kafka_2.12-1.1.0/
    [root@localhost kafka_2.12-1.1.0]# ll
    total 48
    drwxr-xr-x. 3 root root 4096 Mar 23 18:55 bin
    drwxr-xr-x. 2 root root 4096 Mar 31 11:30 config
    drwxr-xr-x. 2 root root 6 Mar 31 11:31 kafka_log
    drwxr-xr-x. 2 root root 4096 Mar 31 11:26 libs
    -rw-r--r--. 1 root root 28824 Mar 23 18:51 LICENSE
    drwxr-xr-x. 2 root root 182 Mar 31 11:33 logs
    -rw-r--r--. 1 root root 336 Mar 23 18:51 NOTICE
    drwxr-xr-x. 2 root root 44 Mar 23 18:55 site-docs
    [root@localhost kafka_2.12-1.1.0]# sh ./bin/kafka-server-start.sh ./config/server.properties &
    [2018-03-31 11:35:47,198] ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
    kafka.common.KafkaException: Socket server failed to bind to 192.178.0.111:9092: Cannot assign requested address.
    at kafka.network.Acceptor.openServerSocket(SocketServer.scala:404)
    at kafka.network.Acceptor.<init>(SocketServer.scala:308)
    at kafka.network.SocketServer.$anonfun$createAcceptorAndProcessors$1(SocketServer.scala:126)
    at kafka.network.SocketServer.$anonfun$createAcceptorAndProcessors$1$adapted(SocketServer.scala:122)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:52)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at kafka.network.SocketServer.createAcceptorAndProcessors(SocketServer.scala:122)
    at kafka.network.SocketServer.startup(SocketServer.scala:84)
    at kafka.server.KafkaServer.startup(KafkaServer.scala:247)
    at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)
    at kafka.Kafka$.main(Kafka.scala:92)
    at kafka.Kafka.main(Kafka.scala)
    Caused by: java.net.BindException: Cannot assign requested address
    at sun.nio.ch.Net.bind0(Native Method)
    at sun.nio.ch.Net.bind(Net.java:433)
    at sun.nio.ch.Net.bind(Net.java:425)
    at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
    at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
    at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67)
    at kafka.network.Acceptor.openServerSocket(SocketServer.scala:400)
    ... 12 more

    备注:上边是启动失败信息,启动失败原因是我的服务器9092端口未开启

    此时,检测2181与9092端口

    [root@localhost kafka_2.12-1.1.0]# netstat -tunlp|egrep "(2181|9092)"
    tcp6 0 0 :::2181 :::* LISTEN 8896/java

    CentOS 7.0默认使用的是firewall作为防火墙,使用iptables必须重新设置一下

    1、直接关闭防火墙

    systemctl stop firewalld.service #停止firewall
    systemctl disable firewalld.service #禁止firewall开机启动

    2、设置 iptables service

    [root@localhost kafka_2.12-1.1.0]# yum -y install iptables-services
    Loaded plugins: fastestmirror
    Loading mirror speeds from cached hostfile
    * base: mirrors.163.com
    * extras: mirrors.163.com
    * updates: mirrors.cn99.com
    Resolving Dependencies
    --> Running transaction check
    ---> Package iptables-services.x86_64 0:1.4.21-18.3.el7_4 will be installed
    --> Processing Dependency: iptables = 1.4.21-18.3.el7_4 for package: iptables-services-1.4.21-18.3.el7_4.x86_64
    --> Running transaction check
    ---> Package iptables.x86_64 0:1.4.21-18.0.1.el7.centos will be updated
    ---> Package iptables.x86_64 0:1.4.21-18.3.el7_4 will be an update
    --> Finished Dependency Resolution
    
    Dependencies Resolved
    
    =======================================================================================================================================================================
    Package Arch Version Repository Size
    =======================================================================================================================================================================
    Installing:
    iptables-services x86_64 1.4.21-18.3.el7_4 updates 51 k
    Updating for dependencies:
    iptables x86_64 1.4.21-18.3.el7_4 updates 428 k
    
    Transaction Summary
    =======================================================================================================================================================================
    Install 1 Package
    Upgrade ( 1 Dependent package)
    
    Total download size: 479 k
    Downloading packages:
    Delta RPMs disabled because /usr/bin/applydeltarpm not installed.
    (1/2): iptables-services-1.4.21-18.3.el7_4.x86_64.rpm | 51 kB 00:00:00 
    (2/2): iptables-1.4.21-18.3.el7_4.x86_64.rpm | 428 kB 00:00:01 
    -----------------------------------------------------------------------------------------------------------------------------------------------------------------------
    Total 447 kB/s | 479 kB 00:00:01 
    Running transaction check
    Running transaction test
    Transaction test succeeded
    Running transaction
    Updating : iptables-1.4.21-18.3.el7_4.x86_64 1/3 
    Installing : iptables-services-1.4.21-18.3.el7_4.x86_64 2/3 
    Cleanup : iptables-1.4.21-18.0.1.el7.centos.x86_64 3/3 
    Verifying : iptables-1.4.21-18.3.el7_4.x86_64 1/3 
    Verifying : iptables-services-1.4.21-18.3.el7_4.x86_64 2/3 
    Verifying : iptables-1.4.21-18.0.1.el7.centos.x86_64 3/3
    
    Installed:
    iptables-services.x86_64 0:1.4.21-18.3.el7_4
    
    Dependency Updated:
    iptables.x86_64 0:1.4.21-18.3.el7_4
    
    Complete!
    [root@localhost kafka_2.12-1.1.0]#

    备注:默认iptables是没有安装的,需要先安装iptables

    如果要修改防火墙配置,如增加防火墙端口 9092

    vi /etc/sysconfig/iptables 

    增加规则 -A INPUT -m state --state NEW -m tcp -p tcp --dport 9092 -j ACCEPT

    [root@localhost kafka_2.12-1.1.0]# vi /etc/sysconfig/iptables 
    [root@localhost kafka_2.12-1.1.0]# more /etc/sysconfig/iptables
    # sample configuration for iptables service
    # you can edit this manually or use system-config-firewall
    # please do not ask us to add additional ports/services to this default configuration
    *filter
    :INPUT ACCEPT [0:0]
    :FORWARD ACCEPT [0:0]
    :OUTPUT ACCEPT [0:0]
    -A INPUT -m state --state RELATED,ESTABLISHED -j ACCEPT
    -A INPUT -p icmp -j ACCEPT
    -A INPUT -i lo -j ACCEPT
    -A INPUT -p tcp -m state --state NEW -m tcp --dport 22 -j ACCEPT
    -A INPUT -m state --state NEW -m tcp -p tcp --dport 9092 -j ACCEPT
    -A INPUT -j REJECT --reject-with icmp-host-prohibited
    -A FORWARD -j REJECT --reject-with icmp-host-prohibited
    COMMIT

    保存退出后

    systemctl restart iptables.service #重启防火墙使配置生效
    systemctl enable iptables.service #设置防火墙开机启动
    
    systemctl start firewalld.service
    systemctl enable firewalld.service

    最后重启系统使设置生效即可。

    重新启动zk,重新启动kafaka,启动后检测端口是否通:

    [root@localhost opt]# netstat -tunlp|egrep "(2181|9092)"
    tcp6 0 0 192.178.0.111:9092 :::* LISTEN 10299/java 
    tcp6 0 0 :::2181 :::* LISTEN 8896/java

    5)新建一个TOPIC

    --创建topic

    /opt/kafka_2.12-1.1.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic kafkatopic 

    此时kafaka服务器开启窗口(执行[root@localhost kafka_2.12-1.1.0]# sh ./bin/kafka-server-start.sh ./config/server.properties &的窗口)会有变化:

    --查看所有topic

     /opt/kafka_2.12-1.1.0/bin/kafka-topics.sh --list --zookeeper localhost:2181 

    --查看指定topic 

    /opt/kafka_2.12-1.1.0/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic logTopic100

    6) 把KAFKA的生产者启动起来:

    /opt/kafka_2.12-1.1.0/bin/kafka-console-producer.sh --broker-list 192.178.0.111:9092 --sync --topic kafkatopic

    7)另开一个终端,把消费者启动起来:

    sh /opt/kafka_2.12-1.1.0/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning
    也可以采用:
    sh /opt/kafka_2.12-1.1.0/bin/kafka-console-consumer.sh --bootstrap-server 192.178.0.111:9092 --topic kafkatopic --from-beginning

    (--from beginning 是从头开始消费,不加则是消费当前正在发送到该topic的消息)

    8)使用
    在发送消息的终端输入aaa,则可以在消费消息的终端显示

    生产者生产:
    [root@localhost ~]# /opt/kafka_2.12-1.1.0/bin/kafka-console-producer.sh --broker-list 192.178.0.111:9092 --topic kafkatopic
    >a
    >b
    >c
    >d
    >
    
    消费者接收:
    [root@localhost ~]# /opt/kafka_2.12-1.1.0/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning
    Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
    a
    b
    c
    d

     参考资料如下:

    1)《 CentOS 7 开放防火墙端口命令

    2)《 kafka+zookeeper环境配置(linux环境单机版)

  • 相关阅读:
    241. Different Ways to Add Parentheses java solutions
    89. Gray Code java solutions
    367. Valid Perfect Square java solutions
    46. Permutations java solutions
    116. Populating Next Right Pointers in Each Node java solutions
    153. Find Minimum in Rotated Sorted Array java solutions
    判断两颗树是否相同
    求二叉树叶子节点的个数
    求二叉树第k层的结点个数
    将二叉排序树转换成排序的双向链表
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/8684974.html
Copyright © 2011-2022 走看看