zoukankan      html  css  js  c++  java
  • kafaka安装和使用及分析

    下载,解压,前面的例子太多了,此处略。

    kafka官网: http://kafka.apache.org/

    具体说明可以参考此官网地址: http://kafka.apache.org/documentation/

    一、 安装和解压

    略.

    二、配置和启动及测试、分析

    2.1 单实例

    [root@master kafka]# pwd
    /opt/kafka
    [root@master kafka]# vi config/server.properties

    #修改三个点:

    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=0   ############此处要唯一,多服务器(集群)中,也必须唯一。否则启动时会报错。类型是 int ,此处定义的是brocker id,唯一。

    ############################# Socket Server Settings #############################

    # The address the socket server listens on. It will get the value returned from
    # java.net.InetAddress.getCanonicalHostName() if not configured.
    # FORMAT:
    # listeners = listener_name://host_name:port
    # EXAMPLE:
    # listeners = PLAINTEXT://your.host.name:9092
    listeners=PLAINTEXT://192.168.234.129:9092 ###加上自己的IP

    # A comma separated list of directories under which to store log files
    log.dirs=/tmp/kafka-logs       #每个brocker的日志目录最好唯一,否则交叉了不好看。

    # Hostname and port the broker will advertise to producers and consumers. If not set,
    # it uses the value for "listeners" if configured. Otherwise, it will use the value
    # returned from java.net.InetAddress.getCanonicalHostName().
    advertised.listeners=PLAINTEXT://192.168.234.129:9092 #如果没配置就用的是 listeners的值。

    启动kafka服务

    备注: 启动之前需启动zookeeper 或者用kafka自带的zookeeper

    [root@master kafka]# bin/kafka-server-start.sh config/server.properties

    这个相当于启动了一个brocker。

    而在一个服务器上是可以启动多个brocker的。每个brocker可以对应一份备份。一个主题可以有多个分区,在创建主题时,备份数不能大于brocker数。

    同一台服务器上再启动一个brocker

    [root@master config]# cp server.properties server1.properties

    修改的就是上面几个点,

    broker.id=1
    listeners=PLAINTEXT://192.168.234.129:9093
    log.dirs=/tmp/kafka-logs-1

    然后启动

    [root@master kafka]# bin/kafka-server-start.sh config/server1.properties

    创建主题

     bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 5 --partitions 2 --topic my-test1

    此时会报错,因为就2个brocker,所以就最大是2. #在多服务器中,最好大于1,防止出现宕机无法恢复的情况。

    --partition 代表的是分区个数。如my-test1就是2个分区,此时kafka内部根据算法将其分到brocker1和brocker2.

    这里的主题名是my-test1,在集群中,设置多个分区来进行负载均衡(kafka自身会进行此项操作:类似于轮转)

    如果我将上面的brocker2放到另一个服务器上。那么就成了集群。一个节点上多个brocker可以认为是多代理。

    2.2 集群

    在node1节点上安装和配置kafka,并启动。(略)

    备注: 启动之前需启动各个节点zookeeper 或者用kafka自带的zookeeper

    node1的 brocker id设置为4.port还是9092

    在master节点上创建一个主题:

    [root@master kafka]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic distr-test
    Created topic "distr-test".

    在node1节点上创建同一个主题:

    [root@node1 kafka]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic distr-test
    Error while executing topic command : Topic 'distr-test' already exists.
    [2018-04-23 11:08:30,250] ERROR org.apache.kafka.common.errors.TopicExistsException: Topic 'distr-test' already exists.
    (kafka.admin.TopicCommand$)

    此时集群内部是共享的。

    在node1上查看主题:

    [root@node1 kafka]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic distr-test
    Topic:distr-test PartitionCount:2 ReplicationFactor:2 Configs:
    Topic: distr-test Partition: 0 Leader: 0 Replicas: 0,4 Isr: 0,4
    Topic: distr-test Partition: 1 Leader: 4 Replicas: 4,0 Isr: 4,0

    分析一下这个主题信息:

    topic name :distr-test 

    Partition 0 和Partition 1 一共2个。

    分区0 在id 为0的brocker上即master上

    分区1个id为4的brocker上,即node1上

    Replicas 为2份,分别在0和4上,

    ISR  is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.

    再弄个主题看下:

    [root@node1 kafka]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic distr-test1
    Created topic "distr-test1".
    [root@node1 kafka]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic distr-test1
    Topic:distr-test1 PartitionCount:4 ReplicationFactor:2 Configs:
    Topic: distr-test1 Partition: 0 Leader: 0 Replicas: 0,4 Isr: 0,4
    Topic: distr-test1 Partition: 1 Leader: 4 Replicas: 4,0 Isr: 4,0
    Topic: distr-test1 Partition: 2 Leader: 0 Replicas: 0,4 Isr: 0,4
    Topic: distr-test1 Partition: 3 Leader: 4 Replicas: 4,0 Isr: 4,0

    以上的例子也可以看出集群可以多读,多写。

    日志的分区分布在Kafka集群中的服务器上,每个服务器处理数据并请求共享分区。每个分区都通过可配置数量的服务器进行复制以实现容错。

    发送消息和消费消息:

    配置zookerper的connectIP,否则在发送消息时找不到另外一个brocker。

    # Zookeeper connection string (see zookeeper docs for details).
    # This is a comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    # You can also append an optional chroot string to the urls to specify the
    # root directory for all kafka znodes.
    zookeeper.connect=192.168.234.129:2181,192.168.234.130:2181

    在master上发送消息

    [root@master kafka]# bin/kafka-console-producer.sh --broker-list 192.168.234.129:9092,192.168.234.130:9092 --topic distr-test 

    >a
    >a
    >^C[root@master kafka]# bin/kafka-console-producer.sh --broker-list 192.168.234.130:9092 --topic distr-test
    >a
    >11
    >

    在node1上消费消息

    [root@node1 kafka]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.234.130:9092,192.168.234.129:9092 --from-beginning --topic distr-test
    a
    a
    a
    a
    11

     

    三、 Kafka connect 导入/导出

    source and sinks.

    [root@master kafka]# echo -e "foo
    bar" > test.txt
    [root@master kafka]# cat test.txt 
    foo
    bar
    [root@master kafka]# bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
    #新建一个会话
    [root@master kafka]# more test.sink.txt
    #消费
    [root@master kafka]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.234.129:9092 --topic connect-test --from-beginning 
    #追加源文件
    [root@master kafka]# echo 111>> test.txt 
    #查看sink文件
    [root@master kafka]# more test.sink.txt

    配置文件信息

    [root@master config]# cat connect-file-source.properties 
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    name=local-file-source
    connector.class=FileStreamSource
    tasks.max=1
    file=test.txt
    topic=connect-test
    [root@master config]# cat connect-file-sink.properties 
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    name=local-file-sink
    connector.class=FileStreamSink
    tasks.max=1
    file=test.sink.txt
    topics=connect-test[root@master config]# 

    所以说kafka 是消息系统也是存储系统。

    四、 Kafka Streams

    请参考官方文档 http://kafka.apache.org/11/documentation/streams/quickstart

    IT之界浩瀚无边 只有持之恒心方可打开一窗 偷窥此中奥秘之一二 取之受益,亦珍而视之 学之留香,方不失风范 共勉 共进
  • 相关阅读:
    poj 2411 Mondriaan's Dream 骨牌铺放 状压dp
    zoj 3471 Most Powerful (有向图)最大生成树 状压dp
    poj 2280 Islands and Bridges 哈密尔顿路 状压dp
    hdu 3001 Travelling 经过所有点(最多两次)的最短路径 三进制状压dp
    poj 3311 Hie with the Pie 经过所有点(可重)的最短路径 floyd + 状压dp
    poj 1185 炮兵阵地 状压dp
    poj 3254 Corn Fields 状压dp入门
    loj 6278 6279 数列分块入门 2 3
    VIM记事——大小写转换
    DKIM支持样本上传做检测的网站
  • 原文地址:https://www.cnblogs.com/zhangmin1987/p/8917440.html
Copyright © 2011-2022 走看看