zoukankan      html  css  js  c++  java
  • Kafka在windows下的配置使用

    Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。具体理论性的内容我也不是特别懂,这里有一篇文章写的很好,转发一下https://blog.csdn.net/YChenFeng/article/details/74980531

    下面就开始配置和使用

    1、下载

    下载地址:http://kafka.apache.org/downloads.html ,我用的是kafka_2.12-1.0.0.tgz版本。有兴趣的可以下载最新办法。

    2、下载完成之后,直接解压,目录结构是这样的

    bin目录是各种启动文件,config是相关参数配置文件,logs是自建的日志文件,libs是使用到的相关jar文件

    启动文件中有windows项,用例直接启动相关服务。

    3、修改配置参数

    进入config目录,编辑 server.properties文件,找到并编辑log.dirs= D:\Tools\kafka_2.11-1.0.0\logs,找到并编辑zookeeper.connect=localhost:2181。表示本地运行。

    (Kafka会按照默认,在9092端口上运行,并连接zookeeper的默认端口:2181)

    4、启动服务,在dos服务下输入下面命令进行启动

    D: oolskafka_2.12-2.1.0inwindowszookeeper-server-start.bat D: oolskafka_2.12-2.1.0configzookeeper.properties

    首先启动,zookeeper服务,对应加载zookeeper的配置文件,kafka依赖zookeeper监控其状态

    D: oolskafka_2.12-2.1.0inwindowskafka-server-start.bat D: oolskafka_2.12-2.1.0configserver.properties

    然后启动kafka服务,对应加载相应配置文件

     5、使用idea在git 上下载kafka-monitor 代码,地址:https://github.com/linxin26/kafka-monitor

    运行Start,在浏览器输入http://127.0.0.1:5050

    查看现有主题 topic

    查看 broker

    6、编写测试类查看相关状况

    import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.TopicPartition;
    
    import java.util.Arrays;
    import java.util.Collection;
    import java.util.Properties;
    
    /**
     * Created by songxiaofei on 2018-12-27.
     */
    public class ConsumerMessage {
    
        public static void main(String[] args){
            Properties props = new Properties();
            props.put("bootstrap.servers", "10.1.9.3:9092");
            props.put("group.id", "test");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            final KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
            consumer.subscribe(Arrays.asList("buy"),new ConsumerRebalanceListener() {
                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                }
                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                    //将偏移设置到最开始
                    consumer.seekToBeginning(collection);
                }
            });
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records)
                    System.out.printf("offset = %d, key = %s, value = %s%n,checksum=%s", record.offset(), record.key(), record.value(),record.checksum());
            }
        }
    }

    执行代码,可以查看相关日志,用来查看kafka的配置情况

    D:	oolsJavajdk1.8.0_31injava "-javaagent:D:	oolsJetBrainsIntelliJ IDEA 2018.1libidea_rt.jar=64422:D:	oolsJetBrainsIntelliJ IDEA 2018.1in" -Dfile.encoding=UTF-8 -classpath D:	oolsJavajdk1.8.0_31jrelibcharsets.jar;D:	oolsJavajdk1.8.0_31jrelibdeploy.jar;D:	oolsJavajdk1.8.0_31jrelibextaccess-bridge-64.jar;D:	oolsJavajdk1.8.0_31jrelibextcldrdata.jar;D:	oolsJavajdk1.8.0_31jrelibextdnsns.jar;D:	oolsJavajdk1.8.0_31jrelibextjaccess.jar;D:	oolsJavajdk1.8.0_31jrelibextjfxrt.jar;D:	oolsJavajdk1.8.0_31jrelibextlocaledata.jar;D:	oolsJavajdk1.8.0_31jrelibext
    ashorn.jar;D:	oolsJavajdk1.8.0_31jrelibextsunec.jar;D:	oolsJavajdk1.8.0_31jrelibextsunjce_provider.jar;D:	oolsJavajdk1.8.0_31jrelibextsunmscapi.jar;D:	oolsJavajdk1.8.0_31jrelibextsunpkcs11.jar;D:	oolsJavajdk1.8.0_31jrelibextzipfs.jar;D:	oolsJavajdk1.8.0_31jrelibjavaws.jar;D:	oolsJavajdk1.8.0_31jrelibjce.jar;D:	oolsJavajdk1.8.0_31jrelibjfr.jar;D:	oolsJavajdk1.8.0_31jrelibjfxswt.jar;D:	oolsJavajdk1.8.0_31jrelibjsse.jar;D:	oolsJavajdk1.8.0_31jrelibmanagement-agent.jar;D:	oolsJavajdk1.8.0_31jrelibplugin.jar;D:	oolsJavajdk1.8.0_31jrelib
    esources.jar;D:	oolsJavajdk1.8.0_31jrelib
    t.jar;E:jetprojectkafka-monitor-master	argetclasses;E:mavrepository.m2
    epositoryorgapachekafkakafka-clients.10.0.1kafka-clients-0.10.0.1.jar;E:mavrepository.m2
    epository
    etjpountzlz4lz41.3.0lz4-1.3.0.jar;E:mavrepository.m2
    epositoryorgxerialsnappysnappy-java1.1.2.6snappy-java-1.1.2.6.jar;E:mavrepository.m2
    epositoryorgapachecommonscommons-lang33.5commons-lang3-3.5.jar;E:mavrepository.m2
    epositoryorgapachekafkakafka_2.10.10.0.1kafka_2.10-0.10.0.1.jar;E:mavrepository.m2
    epositorycom101teczkclient.8zkclient-0.8.jar;E:mavrepository.m2
    epositoryorgscala-langscala-library2.10.6scala-library-2.10.6.jar;E:mavrepository.m2
    epositorycomyammermetricsmetrics-core2.2.0metrics-core-2.2.0.jar;E:mavrepository.m2
    epository
    etsfjopt-simplejopt-simple4.9jopt-simple-4.9.jar;E:mavrepository.m2
    epositorycomalibabafastjson1.2.22fastjson-1.2.22.jar;E:mavrepository.m2
    epositoryorgslf4jslf4j-api1.6.2slf4j-api-1.6.2.jar;E:mavrepository.m2
    epositoryorgslf4jslf4j-log4j121.6.2slf4j-log4j12-1.6.2.jar;E:mavrepository.m2
    epositorycommons-loggingcommons-logging-api1.1commons-logging-api-1.1.jar;E:mavrepository.m2
    epositorylog4jlog4j1.2.16log4j-1.2.16.jar;E:mavrepository.m2
    epositorymysqlmysql-connector-java5.1.40mysql-connector-java-5.1.40.jar;E:mavrepository.m2
    epositoryorgapachecuratorcurator-framework2.10.0curator-framework-2.10.0.jar;E:mavrepository.m2
    epositoryorgapachecuratorcurator-client2.10.0curator-client-2.10.0.jar;E:mavrepository.m2
    epositorycomgoogleguavaguava16.0.1guava-16.0.1.jar;E:mavrepository.m2
    epositoryorgapachezookeeperzookeeper3.4.8zookeeper-3.4.8.jar;E:mavrepository.m2
    epositoryjlinejline.9.94jline-0.9.94.jar;E:mavrepository.m2
    epositoryio
    etty
    etty3.7.0.Final
    etty-3.7.0.Final.jar;E:mavrepository.m2
    epositoryorgapachecuratorcurator-recipes2.10.0curator-recipes-2.10.0.jar;E:mavrepository.m2
    epositoryorgjolokiajolokia-jvm1.3.5jolokia-jvm-1.3.5.jar;E:mavrepository.m2
    epositoryorgjolokiajolokia-core1.5.0jolokia-core-1.5.0.jar;E:mavrepository.m2
    epositorycomgooglecodejson-simplejson-simple1.1.1json-simple-1.1.1.jar;D:	oolsJavajdk1.8.0_31lib	ools.jar;E:mavrepository.m2
    epositoryorgprojectlomboklombok1.16.18lombok-1.16.18.jar;E:mavrepository.m2
    epositoryorgspringframeworkootspring-boot-starter-web2.0.0.RELEASEspring-boot-starter-web-2.0.0.RELEASE.jar;E:mavrepository.m2
    epositoryorgspringframeworkootspring-boot-starter2.0.0.RELEASEspring-boot-starter-2.0.0.RELEASE.jar;E:mavrepository.m2
    epositoryorgspringframeworkootspring-boot2.0.0.RELEASEspring-boot-2.0.0.RELEASE.jar;E:mavrepository.m2
    epositoryorgspringframeworkootspring-boot-autoconfigure2.0.0.RELEASEspring-boot-autoconfigure-2.0.0.RELEASE.jar;E:mavrepository.m2
    epositoryorgspringframeworkootspring-boot-starter-logging2.0.0.RELEASEspring-boot-starter-logging-2.0.0.RELEASE.jar;E:mavrepository.m2
    epositoryorgapachelogginglog4jlog4j-to-slf4j2.10.0log4j-to-slf4j-2.10.0.jar;E:mavrepository.m2
    epositoryorgapachelogginglog4jlog4j-api2.10.0log4j-api-2.10.0.jar;E:mavrepository.m2
    epositoryorgslf4jjul-to-slf4j1.7.25jul-to-slf4j-1.7.25.jar;E:mavrepository.m2
    epositoryjavaxannotationjavax.annotation-api1.3.2javax.annotation-api-1.3.2.jar;E:mavrepository.m2
    epositoryorgspringframeworkspring-core5.0.4.RELEASEspring-core-5.0.4.RELEASE.jar;E:mavrepository.m2
    epositoryorgspringframeworkspring-jcl5.0.4.RELEASEspring-jcl-5.0.4.RELEASE.jar;E:mavrepository.m2
    epositoryorgyamlsnakeyaml1.19snakeyaml-1.19.jar;E:mavrepository.m2
    epositoryorgspringframeworkootspring-boot-starter-json2.0.0.RELEASEspring-boot-starter-json-2.0.0.RELEASE.jar;E:mavrepository.m2
    epositorycomfasterxmljacksoncorejackson-databind2.9.4jackson-databind-2.9.4.jar;E:mavrepository.m2
    epositorycomfasterxmljacksoncorejackson-annotations2.9.0jackson-annotations-2.9.0.jar;E:mavrepository.m2
    epositorycomfasterxmljacksoncorejackson-core2.9.4jackson-core-2.9.4.jar;E:mavrepository.m2
    epositorycomfasterxmljacksondatatypejackson-datatype-jdk82.9.4jackson-datatype-jdk8-2.9.4.jar;E:mavrepository.m2
    epositorycomfasterxmljacksondatatypejackson-datatype-jsr3102.9.4jackson-datatype-jsr310-2.9.4.jar;E:mavrepository.m2
    epositorycomfasterxmljacksonmodulejackson-module-parameter-names2.9.4jackson-module-parameter-names-2.9.4.jar;E:mavrepository.m2
    epositoryorgspringframeworkootspring-boot-starter-tomcat2.0.0.RELEASEspring-boot-starter-tomcat-2.0.0.RELEASE.jar;E:mavrepository.m2
    epositoryorgapache	omcatembed	omcat-embed-core8.5.28	omcat-embed-core-8.5.28.jar;E:mavrepository.m2
    epositoryorgapache	omcatembed	omcat-embed-el8.5.28	omcat-embed-el-8.5.28.jar;E:mavrepository.m2
    epositoryorgapache	omcatembed	omcat-embed-websocket8.5.28	omcat-embed-websocket-8.5.28.jar;E:mavrepository.m2
    epositoryorghibernatevalidatorhibernate-validator6.0.7.Finalhibernate-validator-6.0.7.Final.jar;E:mavrepository.m2
    epositoryjavaxvalidationvalidation-api2.0.1.Finalvalidation-api-2.0.1.Final.jar;E:mavrepository.m2
    epositoryorgjbossloggingjboss-logging3.3.2.Finaljboss-logging-3.3.2.Final.jar;E:mavrepository.m2
    epositorycomfasterxmlclassmate1.3.4classmate-1.3.4.jar;E:mavrepository.m2
    epositoryorgspringframeworkspring-web5.0.4.RELEASEspring-web-5.0.4.RELEASE.jar;E:mavrepository.m2
    epositoryorgspringframeworkspring-beans5.0.4.RELEASEspring-beans-5.0.4.RELEASE.jar;E:mavrepository.m2
    epositoryorgspringframeworkspring-webmvc5.0.4.RELEASEspring-webmvc-5.0.4.RELEASE.jar;E:mavrepository.m2
    epositoryorgspringframeworkspring-aop5.0.4.RELEASEspring-aop-5.0.4.RELEASE.jar;E:mavrepository.m2
    epositoryorgspringframeworkspring-context5.0.4.RELEASEspring-context-5.0.4.RELEASE.jar;E:mavrepository.m2
    epositoryorgspringframeworkspring-expression5.0.4.RELEASEspring-expression-5.0.4.RELEASE.jar co.solinx.kafka.monitor.ConsumerMessage
    log4j:WARN No such property [maxFileSize] in org.apache.log4j.DailyRollingFileAppender.
    log4j:WARN No such property [maxBackupIndex] in org.apache.log4j.DailyRollingFileAppender.
    2019-01-16 15:56:36,600 [main] [org.apache.kafka.clients.consumer.ConsumerConfig]-[INFO] ConsumerConfig values: 
        metric.reporters = []
        metadata.max.age.ms = 300000
        partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        max.partition.fetch.bytes = 1048576
        bootstrap.servers = [10.1.9.3:9092]
        ssl.keystore.type = JKS
        enable.auto.commit = true
        sasl.mechanism = GSSAPI
        interceptor.classes = null
        exclude.internal.topics = true
        ssl.truststore.password = null
        client.id = 
        ssl.endpoint.identification.algorithm = null
        max.poll.records = 2147483647
        check.crcs = true
        request.timeout.ms = 40000
        heartbeat.interval.ms = 3000
        auto.commit.interval.ms = 1000
        receive.buffer.bytes = 65536
        ssl.truststore.type = JKS
        ssl.truststore.location = null
        ssl.keystore.password = null
        fetch.min.bytes = 1
        send.buffer.bytes = 131072
        value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        group.id = test
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.trustmanager.algorithm = PKIX
        ssl.key.password = null
        fetch.max.wait.ms = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        session.timeout.ms = 30000
        metrics.num.samples = 2
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        ssl.protocol = TLS
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null
        ssl.cipher.suites = null
        security.protocol = PLAINTEXT
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        auto.offset.reset = latest

     7、使用命令行创建topic

    D: oolskafka_2.12-2.1.0inwindowskafka-topics.bat --create --zookeeper localhost:12181 --replication-factor 1 --partitions 1 --topic hello

    查看已有的topic

    D: oolskafka_2.12-2.1.0inwindowskafka-topics.bat --list --zookeeper 127.0.0.1:12181

     

    创建一个消息生产者,同时创建一个消息消费者,去接收消息生产者发来的消息

    D: oolskafka_2.12-2.1.0inwindowskafka-console-producer.bat --broker-list 127.0.0.1:9092 --topic hello  创建一个消息生产者

    D: oolskafka_2.12-2.1.0inwindowskafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic hello --from-beginning 创建一个消息消费者

  • 相关阅读:
    session
    CSS3盒子模型
    由“从按下回车到网页显示”粗谈网页优化
    springMVC之拦截器
    设置Webdriver启动chrome为默认用户的配置信息
    [Swift]LeetCode498. 对角线遍历 | Diagonal Traverse
    [Swift]LeetCode497. 非重叠矩形中的随机点 | Random Point in Non-overlapping Rectangles
    [Swift]通天遁地Swift
    [Swift]LeetCode996. 正方形数组的数目 | Number of Squareful Arrays
    [Swift]LeetCode995. K 连续位的最小翻转次数 | Minimum Number of K Consecutive Bit Flips
  • 原文地址:https://www.cnblogs.com/shej123/p/10277653.html
Copyright © 2011-2022 走看看