zoukankan      html  css  js  c++  java
  • Kafka安装与使用

    Kafka安装与使用

    下载地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz

    安装以及启动kafka

    步骤1:安装kafka

    $ tar -xzf kafka_2.10-0.8.1.1.tgz
    $ cd kafka_2.10-0.8.1.1.tgz 

    步骤2:配置server.properties

     配置zookeeper(假设您已经安装了zookeeper,如果没有安装,请再网上搜索安装方法)

    进入kafka安装工程根目录编辑 

    vim config/server.properties  

    修改属性 zookeeper.connect=ip:2181,ip2: 2181

    步骤3:server.properties配置说明

    kafka最为重要三个配置依次为:broker.id、log.dir、zookeeper.connect

    kafka server端config/server.properties参数说明和解释如下:

    (参考配置说明地址:http://blog.csdn.net/lizhitao/article/details/25667831)

    #实际使用案例 这里211上面的kafka 配置文件
    broker.id=1
    port=9092
    host.name=192.168.1.211
    num.network.threads=2
    num.io.threads=8
    socket.send.buffer.bytes=1048576
    socket.receive.buffer.bytes=1048576
    socket.request.max.bytes=104857600
    log.dirs=/tmp/kafka-logs
    num.partitions=2
    log.retention.hours=168
    log.segment.bytes=536870912
    log.retention.check.interval.ms=60000
    log.cleaner.enable=false
    zookeeper.connect=192.168.1.213:2181,192.168.1.216:2181,192.168.1.217:2181
    zookeeper.connection.timeout.ms=1000000
    #kafka实际使用案例 210服务器kafka配置
    broker.id=2
    port=9092
    host.name=192.168.1.210
    num.network.threads=2
    num.io.threads=8
    socket.send.buffer.bytes=1048576
    socket.receive.buffer.bytes=1048576
    socket.request.max.bytes=104857600
    log.dirs=/tmp/kafka-logs
    num.partitions=2
    log.retention.hours=168
    log.segment.bytes=536870912
    log.retention.check.interval.ms=60000
    log.cleaner.enable=false
    zookeeper.connect=192.168.1.213:2181,192.168.1.216:2181,192.168.1.217:2181
    zookeeper.connection.timeout.ms=1000000

     

    步骤4: 启动kafka

    (先启动zookeeper $:  bin/zkServer.sh start config/zookeeper.properties &)

    cd kafka-0.8.1

    $ bin/kafka-server-start.sh -daemon config/server.properties &

     (实验时,需要启动至少两个broker   bin/kafka-server-start.sh -daemon config/server-1.properties &) 

    步骤5:创建topic

    $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    步骤6:验证topic是否创建成功

    $ bin/kafka-topics.sh --list --zookeeper localhost:2181

    localhostzookeeper地址 

    topic描述:

    bin/kafka-topics.sh --describe --zookeeper 192.168.1.8:2181 --topic test
    //启动报错Unrecognized VM option '+UseCompressedOops'
    查看 bin/kafka-run-class.sh
    找到
    if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
      KAFKA_JVM_PERFORMANCE_OPTS="-server  -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"
    fi
    去掉-XX:+UseCompressedOops
    启动报错 Could not reserve enough space for object heap
    原因及解决办法:
    查看kafka-server-start.sh配置文件,发现有heap设置信息:KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"    更改这里的内存为256(因为测试机内存总共才1G ,所以报错)
     

    步骤7:发送消息

    发送一些消息验证,在console模式下,启动producer

    $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

    (此处localhost改为本机ip,否则报错,I don’t  know why

     消息案例: 

    {"price":"100000","userId":14615501351480021,"payType":3,"code":"AFD3B8","payTime":{"time":1457330791333,"minutes":6,"seconds":31,"hours":14,"month":2,"year":116,"timezoneOffset":-480,"day":1,"date":7},"orderId":12222096,"goodsName":"会员"} 

     步骤8:启动一个consumer

    $ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

     





    删除topic,慎用,只会删除zookeeper中的元数据,消息文件须手动删除

    bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic test --zookeeper 192.168.197.170:2181 ,192.168.197.171:2181

        





    配置kafka集群模式,需要由多个broker组成

     

    和单机环境一样,只是需要修改下broker 的配置文件而已。

    1、将单机版的kafka 目录复制到其他几台电脑上。

    2、修改每台电脑上的kafka 目录下的server.properties 文件。

    broker.id=1//这个参数在kafka 的broker 集群中必须唯一,且为正整数。

    3、启动每台电脑上的kafka 即可。

     

    本机配置伪分布式

     

    首先为每个节点编写配置文件:
    > cp config/server.properties config/server-1.properties
    > cp config/server.properties config/server-2.properties
    在拷贝出的新文件中添加以下参数:
    config/server-1.properties:
    broker.id=1
    port=9093
    log.dir=/tmp/kafka-logs-1

    config/server-2.properties:

    broker.id=2
    port=9094
    log.dir=/tmp/kafka-logs-2
    现在启动另外两个节点:
    > bin/kafka-server-start.sh config/server-1.properties &
    ...
    > bin/kafka-server-start.sh config/server-2.properties &
    ...
    创建一个拥有3个副本的topic:
    > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
    运行“"describe topics”命令知道每个节点的信息
    > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
    Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
    Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
    leader:负责处理消息的读和写,leader是从所有节点中随机选择的.
    replicas:列出了所有的副本节点,不管节点是否在服务中.
    isr:是正在服务中的节点.
      



    搭建Kafka开发环境

    1 在pom.xml中引入kafka依赖jar包
    <!-- kafka配置 -->
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.9.2</artifactId>
    <version>${kafka.version}</version>
    <exclusions>
    <!-- 实际应用中单独引入下面的jar包,不使用kafka带的 -->
    <exclusion>
    <artifactId>zookeeper</artifactId>
    <groupId>org.apache.zookeeper</groupId>
    </exclusion>
    <exclusion>
    <artifactId>zkclient</artifactId>
    <groupId>com.101tec</groupId>
    </exclusion>
    <exclusion>
    <artifactId>slf4j-api</artifactId>
    <groupId>org.slf4j</groupId>
    </exclusion>
    </exclusions>
    </dependency>
    2.属性文件 kafka.properties
    #zookeeper.connect=192.168.1.8:2181,192.168.1.13:2181,192.168.1.16:2181
    #zookeeper.connect=zkserver1.vko.cn:2181,zkserver2.vko.cn:2181,zkserver3.vko.cn:2181
    zookeeper.connect=192.168.1.179:2181
    metadata.broker.list=192.168.1.179:9092
    #metadata.broker.list=kafka.server1.vko.cn:9092,kafka.server2.vko.cn:9092
     
    #zookeeper.connect.timeout=15000
    #zookeeper.session.timeout.ms=15000
    #zookeeper.sync.time.ms=20000
    #auto.commit.interval.ms=20000
    #auto.offset.reset=smallest
    #serializer.class=kafka.serializer.StringEncoder
    #producer.type=async
    #queue.buffering.max.ms=6000
     
    group.id=llx
    kafka.sellstat.topics=llx
    在spring配置文件中引入此properties文件
    <!-- 这个是加载给spring 用的.-->  
    <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
    <property name="locations">
    <list>
    <value>classpath:kafka.properties</value>
    </list>
    </property>
    </bean>
    <!-- 这个是用来在代码中注入用的.-->  
    <bean id="configProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
    <property name="locations">
    <list>
    <value>classpath:kafka.properties</value>
    </list>
    </property>
    </bean>
     
    3.定义收信人
    <!-- 定义收信人 receiver -->
    <bean id="testReceiver" class="cn.vko.index.Receiver">
     
    <constructor-arg index="0" value="${zookeeper.connect}" />
     
    <constructor-arg index="1" value="${group.id}" />
     
    <constructor-arg index="2" value="${kafka.sellstat.topics}"/>
     
    <constructor-arg index="3" ref="testConsumer" />
    </bean>
    4. spring中定义一个消息处理器(需要实现vkoConsumer)
    <!-- 定义消息处理器 -->
    <bean id="testConsumer" class="cn.vko.index.TestConsumer" ></bean>

    5.消息生产者项目引入producer

    <bean id="topProducer" class="top.lilixin.TopProducer">
             <constructor-arg index="0" value="kafka.server1:9092,kafka.server2:9092" />
        </bean>
     
    6代码实现 见我的https://github.com/lilixin  learn-kafka项目
  • 相关阅读:
    Cookie
    servletContext组件
    Servlet的定义及生命周期
    导引:servlet&Jsp的经典模式
    一个servlet server,由移植自Tomcat的连接器模块和自编写的container模块代码组成
    一个servlet web server,由移植自Tomcat的完整的connector模块和简化的Container(取代servlet处理器)组成
    python初识
    Maekdown光速习得
    实例学习——爬取简书网用户动态
    使用CSDN-markdown编辑器粘贴代码块时崩溃问题解决
  • 原文地址:https://www.cnblogs.com/lilixin/p/5775877.html
Copyright © 2011-2022 走看看