zoukankan      html  css  js  c++  java
  • centos单机安装zookeeper+kafaka

    环境如下:

    CentOS-7-x86_64
    zookeeper-3.4.11
    kafka_2.12-1.1.0

    一.zookeeper下载与安装

    1)下载zookeeper

    [root@localhost opt]# cd /opt/
    [root@localhost opt]# wget https://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz

    2)解压

    [root@localhost opt]# tar zxvf zookeeper-3.4.11.tar.gz 
    [root@localhost opt]# ls
    zookeeper-3.4.11 zookeeper-3.4.11.tar.gz

    3)配置

    [root@localhost opt]# cd zookeeper-3.4.11
    [root@localhost zookeeper-3.4.11]# ll
    total 1596
    drwxr-xr-x. 2 502 games 149 Nov 1 14:52 bin
    -rw-r--r--. 1 502 games 87943 Nov 1 14:47 build.xml
    drwxr-xr-x. 2 502 games 77 Nov 1 14:52 conf
    drwxr-xr-x. 10 502 games 130 Nov 1 14:47 contrib
    drwxr-xr-x. 2 502 games 4096 Nov 1 14:54 dist-maven
    drwxr-xr-x. 6 502 games 4096 Nov 1 14:52 docs
    -rw-r--r--. 1 502 games 1709 Nov 1 14:47 ivysettings.xml
    -rw-r--r--. 1 502 games 8197 Nov 1 14:47 ivy.xml
    drwxr-xr-x. 4 502 games 4096 Nov 1 14:52 lib
    -rw-r--r--. 1 502 games 11938 Nov 1 14:47 LICENSE.txt
    -rw-r--r--. 1 502 games 3132 Nov 1 14:47 NOTICE.txt
    -rw-r--r--. 1 502 games 1585 Nov 1 14:47 README.md
    -rw-r--r--. 1 502 games 1770 Nov 1 14:47 README_packaging.txt
    drwxr-xr-x. 5 502 games 47 Nov 1 14:47 recipes
    drwxr-xr-x. 8 502 games 211 Nov 1 14:52 src
    -rw-r--r--. 1 502 games 1478279 Nov 1 14:49 zookeeper-3.4.11.jar
    -rw-r--r--. 1 502 games 195 Nov 1 14:52 zookeeper-3.4.11.jar.asc
    -rw-r--r--. 1 502 games 33 Nov 1 14:49 zookeeper-3.4.11.jar.md5
    -rw-r--r--. 1 502 games 41 Nov 1 14:49 zookeeper-3.4.11.jar.sha1
    [root@localhost zookeeper-3.4.11]# cp -rf conf/zoo_sample.cfg conf/zoo.cfg
    [root@localhost zookeeper-3.4.11]# vi conf/zoo.cfg

    修改或添加zoo.cfg文件中如下两个配置项:

    dataDir=/opt/zookeeper-3.4.11/zkdata #这个目录是预先创建的
    dataLogDir=/opt/zookeeper-3.4.11/zkdatalog #这个目录是预先创建的

    创建zk数据存储和zk日志存储目录:

    [root@localhost zookeeper-3.4.11]# mkdir /opt/zookeeper-3.4.11/zkdata
    [root@localhost zookeeper-3.4.11]# mkdir /opt/zookeeper-3.4.11/zkdatalog
    [root@localhost zookeeper-3.4.11]# ll
    total 1596
    drwxr-xr-x. 2 502 games 149 Nov 1 14:52 bin
    -rw-r--r--. 1 502 games 87943 Nov 1 14:47 build.xml
    drwxr-xr-x. 2 502 games 92 Mar 31 11:12 conf
    drwxr-xr-x. 10 502 games 130 Nov 1 14:47 contrib
    drwxr-xr-x. 2 502 games 4096 Nov 1 14:54 dist-maven
    drwxr-xr-x. 6 502 games 4096 Nov 1 14:52 docs
    -rw-r--r--. 1 502 games 1709 Nov 1 14:47 ivysettings.xml
    -rw-r--r--. 1 502 games 8197 Nov 1 14:47 ivy.xml
    drwxr-xr-x. 4 502 games 4096 Nov 1 14:52 lib
    -rw-r--r--. 1 502 games 11938 Nov 1 14:47 LICENSE.txt
    -rw-r--r--. 1 502 games 3132 Nov 1 14:47 NOTICE.txt
    -rw-r--r--. 1 502 games 1585 Nov 1 14:47 README.md
    -rw-r--r--. 1 502 games 1770 Nov 1 14:47 README_packaging.txt
    drwxr-xr-x. 5 502 games 47 Nov 1 14:47 recipes
    drwxr-xr-x. 8 502 games 211 Nov 1 14:52 src
    drwxr-xr-x. 2 root root 6 Mar 31 11:13 zkdata
    drwxr-xr-x. 2 root root 6 Mar 31 11:13 zkdatalog
    -rw-r--r--. 1 502 games 1478279 Nov 1 14:49 zookeeper-3.4.11.jar
    -rw-r--r--. 1 502 games 195 Nov 1 14:52 zookeeper-3.4.11.jar.asc
    -rw-r--r--. 1 502 games 33 Nov 1 14:49 zookeeper-3.4.11.jar.md5
    -rw-r--r--. 1 502 games 41 Nov 1 14:49 zookeeper-3.4.11.jar.sha1

    4)配置环境变量

    [root@localhost zookeeper-3.4.11]# vi /etc/profile

    配置项如下:

    # config java class path 
    export JAVA_HOME=/usr/local/java/jdk1.8.0_161
    export JRE_HOME=${JAVA_HOME}/jre
    export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib:$ZOOKEEPER_HOME/lib:
    export PATH=${JAVA_HOME}/bin:$ZOOKEEPER_HOME/bin:$PATH
    
    # config zookeeper install path
    export ZOOKEEPER_HOME=/opt/zookeeper-3.4.11

    5)启动zookeeper

    [root@localhost bin]# cd /opt/zookeeper-3.4.11/bin
    [root@localhost bin]# ll
    total 36
    -rwxr-xr-x. 1 502 games 232 Nov 1 14:47 README.txt
    -rwxr-xr-x. 1 502 games 1937 Nov 1 14:47 zkCleanup.sh
    -rwxr-xr-x. 1 502 games 1056 Nov 1 14:47 zkCli.cmd
    -rwxr-xr-x. 1 502 games 1534 Nov 1 14:47 zkCli.sh
    -rwxr-xr-x. 1 502 games 1628 Nov 1 14:47 zkEnv.cmd
    -rwxr-xr-x. 1 502 games 2696 Nov 1 14:47 zkEnv.sh
    -rwxr-xr-x. 1 502 games 1089 Nov 1 14:47 zkServer.cmd
    -rwxr-xr-x. 1 502 games 6773 Nov 1 14:47 zkServer.sh
    [root@localhost bin]# ./zkServer.sh start
    ZooKeeper JMX enabled by default
    Using config: /opt/zookeeper-3.4.11/bin/../conf/zoo.cfg
    Starting zookeeper ... STARTED

    二.kafka下载与安装

    1).下载kafka:

    [root@localhost bin]# cd /opt/
    [root@localhost opt]# wget http://apache.fayea.com/kafka/1.1.0/kafka_2.12-1.1.0.tgz
    --2018-03-31 11:21:52-- http://apache.fayea.com/kafka/1.1.0/kafka_2.12-1.1.0.tgz
    Resolving apache.fayea.com (apache.fayea.com)... 202.115.175.188, 202.115.175.187
    Connecting to apache.fayea.com (apache.fayea.com)|202.115.175.188|:80... connected.
    HTTP request sent, awaiting response... 200 OK
    Length: 50326212 (48M) [application/x-gzip]
    Saving to: ‘kafka_2.12-1.1.0.tgz’
    
    100%[=============================================================================================================================>] 50,326,212 442KB/s in 1m 44s
    
    2018-03-31 11:23:36 (473 KB/s) - ‘kafka_2.12-1.1.0.tgz’ saved [50326212/50326212]
    
    [root@localhost opt]# ll
    total 84964
    -rw-r--r--. 1 root root 50326212 Mar 28 08:05 kafka_2.12-1.1.0.tgz
    drwxr-xr-x. 15 502 games 4096 Mar 31 11:20 zookeeper-3.4.11
    -rw-r--r--. 1 root root 36668066 Nov 8 13:24 zookeeper-3.4.11.tar.gz

    2) 解压:

    tar -zxvf kafka_2.12-1.1.0.tgz

    3) 配置:

    进入kafka安装工程根目录编辑config/server.properties

    [root@localhost opt]# cd /opt/kafka_2.12-1.1.0/config/
    [root@localhost config]# ll
    total 64
    -rw-r--r--. 1 root root 906 Mar 23 18:51 connect-console-sink.properties
    -rw-r--r--. 1 root root 909 Mar 23 18:51 connect-console-source.properties
    -rw-r--r--. 1 root root 5807 Mar 23 18:51 connect-distributed.properties
    -rw-r--r--. 1 root root 883 Mar 23 18:51 connect-file-sink.properties
    -rw-r--r--. 1 root root 881 Mar 23 18:51 connect-file-source.properties
    -rw-r--r--. 1 root root 1111 Mar 23 18:51 connect-log4j.properties
    -rw-r--r--. 1 root root 2730 Mar 23 18:51 connect-standalone.properties
    -rw-r--r--. 1 root root 1221 Mar 23 18:51 consumer.properties
    -rw-r--r--. 1 root root 4727 Mar 23 18:51 log4j.properties
    -rw-r--r--. 1 root root 1919 Mar 23 18:51 producer.properties
    -rw-r--r--. 1 root root 6851 Mar 23 18:51 server.properties
    -rw-r--r--. 1 root root 1032 Mar 23 18:51 tools-log4j.properties
    -rw-r--r--. 1 root root 1023 Mar 23 18:51 zookeeper.properties
    [root@localhost config]# mkdir /opt/kafka_2.12-1.1.0/kafka_log

    添加或者修改以下两个配置项:

    log.dirs=/opt/kafka_2.12-1.1.0/kafka_log      #(提前创建)
    listeners=PLAINTEXT://192.168.0.111:9092

    config/server.properties修改后:

      1 [root@localhost config]# more server.properties 
      2 # Licensed to the Apache Software Foundation (ASF) under one or more
      3 # contributor license agreements. See the NOTICE file distributed with
      4 # this work for additional information regarding copyright ownership.
      5 # The ASF licenses this file to You under the Apache License, Version 2.0
      6 # (the "License"); you may not use this file except in compliance with
      7 # the License. You may obtain a copy of the License at
      8 #
      9 # http://www.apache.org/licenses/LICENSE-2.0
     10 #
     11 # Unless required by applicable law or agreed to in writing, software
     12 # distributed under the License is distributed on an "AS IS" BASIS,
     13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14 # See the License for the specific language governing permissions and
     15 # limitations under the License.
     16 
     17 # see kafka.server.KafkaConfig for additional details and defaults
     18 
     19 ############################# Server Basics #############################
     20 
     21 # The id of the broker. This must be set to a unique integer for each broker.
     22 broker.id=0
     23 
     24 ############################# Socket Server Settings #############################
     25 
     26 # The address the socket server listens on. It will get the value returned from 
     27 # java.net.InetAddress.getCanonicalHostName() if not configured.
     28 # FORMAT:
     29 # listeners = listener_name://host_name:port
     30 # EXAMPLE:
     31 # listeners = PLAINTEXT://your.host.name:9092
     32 #listeners=PLAINTEXT://:9092
     33 listeners=PLAINTEXT://192.178.0.111:9092
     34 
     35 # Hostname and port the broker will advertise to producers and consumers. If not set, 
     36 # it uses the value for "listeners" if configured. Otherwise, it will use the value
     37 # returned from java.net.InetAddress.getCanonicalHostName().
     38 #advertised.listeners=PLAINTEXT://your.host.name:9092
     39 
     40 # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
     41 #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
     42 
     43 # The number of threads that the server uses for receiving requests from the network and sending responses to the network
     44 num.network.threads=3
     45 
     46 # The number of threads that the server uses for processing requests, which may include disk I/O
     47 num.io.threads=8
     48 
     49 # The send buffer (SO_SNDBUF) used by the socket server
     50 socket.send.buffer.bytes=102400
     51 
     52 # The receive buffer (SO_RCVBUF) used by the socket server
     53 socket.receive.buffer.bytes=102400
     54 
     55 # The maximum size of a request that the socket server will accept (protection against OOM)
     56 socket.request.max.bytes=104857600
     57 
     58 
     59 ############################# Log Basics #############################
     60 
     61 # A comma separated list of directories under which to store log files
     62 #log.dirs=/tmp/kafka-logs
     63 log.dirs=/opt/kafka_2.12-1.1.0/kafka_log
     64 
     65 # The default number of log partitions per topic. More partitions allow greater
     66 # parallelism for consumption, but this will also result in more files across
     67 # the brokers.
     68 num.partitions=1
     69 
     70 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
     71 # This value is recommended to be increased for installations with data dirs located in RAID array.
     72 num.recovery.threads.per.data.dir=1
     73 
     74 ############################# Internal Topic Settings #############################
     75 # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
     76 # For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
     77 offsets.topic.replication.factor=1
     78 transaction.state.log.replication.factor=1
     79 transaction.state.log.min.isr=1
     80 
     81 ############################# Log Flush Policy #############################
     82 
     83 # Messages are immediately written to the filesystem but by default we only fsync() to sync
     84 # the OS cache lazily. The following configurations control the flush of data to disk.
     85 # There are a few important trade-offs here:
     86 # 1. Durability: Unflushed data may be lost if you are not using replication.
     87 # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
     88 # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
     89 # The settings below allow one to configure the flush policy to flush data after a period of time or
     90 # every N messages (or both). This can be done globally and overridden on a per-topic basis.
     91 
     92 # The number of messages to accept before forcing a flush of data to disk
     93 #log.flush.interval.messages=10000
     94 
     95 # The maximum amount of time a message can sit in a log before we force a flush
     96 #log.flush.interval.ms=1000
     97 
     98 ############################# Log Retention Policy #############################
     99 
    100 # The following configurations control the disposal of log segments. The policy can
    101 # be set to delete segments after a period of time, or after a given size has accumulated.
    102 # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
    103 # from the end of the log.
    104 
    105 # The minimum age of a log file to be eligible for deletion due to age
    106 log.retention.hours=168
    107 
    108 # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
    109 # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
    110 #log.retention.bytes=1073741824
    111 
    112 # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    113 log.segment.bytes=1073741824
    114 
    115 # The interval at which log segments are checked to see if they can be deleted according
    116 # to the retention policies
    117 log.retention.check.interval.ms=300000
    118 
    119 ############################# Zookeeper #############################
    120 
    121 # Zookeeper connection string (see zookeeper docs for details).
    122 # This is a comma separated host:port pairs, each corresponding to a zk
    123 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    124 # You can also append an optional chroot string to the urls to specify the
    125 # root directory for all kafka znodes.
    126 zookeeper.connect=localhost:2181
    127 
    128 # Timeout in ms for connecting to zookeeper
    129 zookeeper.connection.timeout.ms=6000
    130 
    131 
    132 ############################# Group Coordinator Settings #############################
    133 
    134 # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
    135 # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
    136 # The default value for this is 3 seconds.
    137 # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
    138 # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances dur
    139 ing application startup.
    140 group.initial.rebalance.delay.ms=0
    View Code

    4)启动kafka

    [root@localhost kafka_2.12-1.1.0]# cd /opt/kafka_2.12-1.1.0/
    [root@localhost kafka_2.12-1.1.0]# ll
    total 48
    drwxr-xr-x. 3 root root 4096 Mar 23 18:55 bin
    drwxr-xr-x. 2 root root 4096 Mar 31 11:30 config
    drwxr-xr-x. 2 root root 6 Mar 31 11:31 kafka_log
    drwxr-xr-x. 2 root root 4096 Mar 31 11:26 libs
    -rw-r--r--. 1 root root 28824 Mar 23 18:51 LICENSE
    drwxr-xr-x. 2 root root 182 Mar 31 11:33 logs
    -rw-r--r--. 1 root root 336 Mar 23 18:51 NOTICE
    drwxr-xr-x. 2 root root 44 Mar 23 18:55 site-docs
    [root@localhost kafka_2.12-1.1.0]# sh ./bin/kafka-server-start.sh ./config/server.properties &
    [2018-03-31 11:35:47,198] ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
    kafka.common.KafkaException: Socket server failed to bind to 192.178.0.111:9092: Cannot assign requested address.
    at kafka.network.Acceptor.openServerSocket(SocketServer.scala:404)
    at kafka.network.Acceptor.<init>(SocketServer.scala:308)
    at kafka.network.SocketServer.$anonfun$createAcceptorAndProcessors$1(SocketServer.scala:126)
    at kafka.network.SocketServer.$anonfun$createAcceptorAndProcessors$1$adapted(SocketServer.scala:122)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:52)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at kafka.network.SocketServer.createAcceptorAndProcessors(SocketServer.scala:122)
    at kafka.network.SocketServer.startup(SocketServer.scala:84)
    at kafka.server.KafkaServer.startup(KafkaServer.scala:247)
    at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)
    at kafka.Kafka$.main(Kafka.scala:92)
    at kafka.Kafka.main(Kafka.scala)
    Caused by: java.net.BindException: Cannot assign requested address
    at sun.nio.ch.Net.bind0(Native Method)
    at sun.nio.ch.Net.bind(Net.java:433)
    at sun.nio.ch.Net.bind(Net.java:425)
    at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
    at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
    at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67)
    at kafka.network.Acceptor.openServerSocket(SocketServer.scala:400)
    ... 12 more

    备注:上边是启动失败信息,启动失败原因是我的服务器9092端口未开启

    此时,检测2181与9092端口

    [root@localhost kafka_2.12-1.1.0]# netstat -tunlp|egrep "(2181|9092)"
    tcp6 0 0 :::2181 :::* LISTEN 8896/java

    CentOS 7.0默认使用的是firewall作为防火墙,使用iptables必须重新设置一下

    1、直接关闭防火墙

    systemctl stop firewalld.service #停止firewall
    systemctl disable firewalld.service #禁止firewall开机启动

    2、设置 iptables service

    [root@localhost kafka_2.12-1.1.0]# yum -y install iptables-services
    Loaded plugins: fastestmirror
    Loading mirror speeds from cached hostfile
    * base: mirrors.163.com
    * extras: mirrors.163.com
    * updates: mirrors.cn99.com
    Resolving Dependencies
    --> Running transaction check
    ---> Package iptables-services.x86_64 0:1.4.21-18.3.el7_4 will be installed
    --> Processing Dependency: iptables = 1.4.21-18.3.el7_4 for package: iptables-services-1.4.21-18.3.el7_4.x86_64
    --> Running transaction check
    ---> Package iptables.x86_64 0:1.4.21-18.0.1.el7.centos will be updated
    ---> Package iptables.x86_64 0:1.4.21-18.3.el7_4 will be an update
    --> Finished Dependency Resolution
    
    Dependencies Resolved
    
    =======================================================================================================================================================================
    Package Arch Version Repository Size
    =======================================================================================================================================================================
    Installing:
    iptables-services x86_64 1.4.21-18.3.el7_4 updates 51 k
    Updating for dependencies:
    iptables x86_64 1.4.21-18.3.el7_4 updates 428 k
    
    Transaction Summary
    =======================================================================================================================================================================
    Install 1 Package
    Upgrade ( 1 Dependent package)
    
    Total download size: 479 k
    Downloading packages:
    Delta RPMs disabled because /usr/bin/applydeltarpm not installed.
    (1/2): iptables-services-1.4.21-18.3.el7_4.x86_64.rpm | 51 kB 00:00:00 
    (2/2): iptables-1.4.21-18.3.el7_4.x86_64.rpm | 428 kB 00:00:01 
    -----------------------------------------------------------------------------------------------------------------------------------------------------------------------
    Total 447 kB/s | 479 kB 00:00:01 
    Running transaction check
    Running transaction test
    Transaction test succeeded
    Running transaction
    Updating : iptables-1.4.21-18.3.el7_4.x86_64 1/3 
    Installing : iptables-services-1.4.21-18.3.el7_4.x86_64 2/3 
    Cleanup : iptables-1.4.21-18.0.1.el7.centos.x86_64 3/3 
    Verifying : iptables-1.4.21-18.3.el7_4.x86_64 1/3 
    Verifying : iptables-services-1.4.21-18.3.el7_4.x86_64 2/3 
    Verifying : iptables-1.4.21-18.0.1.el7.centos.x86_64 3/3
    
    Installed:
    iptables-services.x86_64 0:1.4.21-18.3.el7_4
    
    Dependency Updated:
    iptables.x86_64 0:1.4.21-18.3.el7_4
    
    Complete!
    [root@localhost kafka_2.12-1.1.0]#

    备注:默认iptables是没有安装的,需要先安装iptables

    如果要修改防火墙配置,如增加防火墙端口 9092

    vi /etc/sysconfig/iptables 

    增加规则 -A INPUT -m state --state NEW -m tcp -p tcp --dport 9092 -j ACCEPT

    [root@localhost kafka_2.12-1.1.0]# vi /etc/sysconfig/iptables 
    [root@localhost kafka_2.12-1.1.0]# more /etc/sysconfig/iptables
    # sample configuration for iptables service
    # you can edit this manually or use system-config-firewall
    # please do not ask us to add additional ports/services to this default configuration
    *filter
    :INPUT ACCEPT [0:0]
    :FORWARD ACCEPT [0:0]
    :OUTPUT ACCEPT [0:0]
    -A INPUT -m state --state RELATED,ESTABLISHED -j ACCEPT
    -A INPUT -p icmp -j ACCEPT
    -A INPUT -i lo -j ACCEPT
    -A INPUT -p tcp -m state --state NEW -m tcp --dport 22 -j ACCEPT
    -A INPUT -m state --state NEW -m tcp -p tcp --dport 9092 -j ACCEPT
    -A INPUT -j REJECT --reject-with icmp-host-prohibited
    -A FORWARD -j REJECT --reject-with icmp-host-prohibited
    COMMIT

    保存退出后

    systemctl restart iptables.service #重启防火墙使配置生效
    systemctl enable iptables.service #设置防火墙开机启动
    
    systemctl start firewalld.service
    systemctl enable firewalld.service

    最后重启系统使设置生效即可。

    重新启动zk,重新启动kafaka,启动后检测端口是否通:

    [root@localhost opt]# netstat -tunlp|egrep "(2181|9092)"
    tcp6 0 0 192.178.0.111:9092 :::* LISTEN 10299/java 
    tcp6 0 0 :::2181 :::* LISTEN 8896/java

    5)新建一个TOPIC

    --创建topic

    /opt/kafka_2.12-1.1.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic kafkatopic 

    此时kafaka服务器开启窗口(执行[root@localhost kafka_2.12-1.1.0]# sh ./bin/kafka-server-start.sh ./config/server.properties &的窗口)会有变化:

    --查看所有topic

     /opt/kafka_2.12-1.1.0/bin/kafka-topics.sh --list --zookeeper localhost:2181 

    --查看指定topic 

    /opt/kafka_2.12-1.1.0/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic logTopic100

    6) 把KAFKA的生产者启动起来:

    /opt/kafka_2.12-1.1.0/bin/kafka-console-producer.sh --broker-list 192.178.0.111:9092 --sync --topic kafkatopic

    7)另开一个终端,把消费者启动起来:

    sh /opt/kafka_2.12-1.1.0/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning
    也可以采用:
    sh /opt/kafka_2.12-1.1.0/bin/kafka-console-consumer.sh --bootstrap-server 192.178.0.111:9092 --topic kafkatopic --from-beginning

    (--from beginning 是从头开始消费,不加则是消费当前正在发送到该topic的消息)

    8)使用
    在发送消息的终端输入aaa,则可以在消费消息的终端显示

    生产者生产:
    [root@localhost ~]# /opt/kafka_2.12-1.1.0/bin/kafka-console-producer.sh --broker-list 192.178.0.111:9092 --topic kafkatopic
    >a
    >b
    >c
    >d
    >
    
    消费者接收:
    [root@localhost ~]# /opt/kafka_2.12-1.1.0/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning
    Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
    a
    b
    c
    d

     参考资料如下:

    1)《 CentOS 7 开放防火墙端口命令

    2)《 kafka+zookeeper环境配置(linux环境单机版)

  • 相关阅读:
    2020/3/12
    练习题1
    2020/3/26
    2020/3/25
    2020/3/24
    2020/3/23
    应用层
    bzoj3326[SCOI2013]数数
    HEOI2017游记
    bzoj4417[SHOI2013]超级跳马
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/8684974.html
Copyright © 2011-2022 走看看