Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。具体理论性的内容我也不是特别懂,这里有一篇文章写的很好,转发一下https://blog.csdn.net/YChenFeng/article/details/74980531
下面就开始配置和使用
1、下载
下载地址:http://kafka.apache.org/downloads.html ,我用的是kafka_2.12-1.0.0.tgz版本。有兴趣的可以下载最新办法。
2、下载完成之后,直接解压,目录结构是这样的
bin目录是各种启动文件,config是相关参数配置文件,logs是自建的日志文件,libs是使用到的相关jar文件
启动文件中有windows项,用例直接启动相关服务。
3、修改配置参数
进入config目录,编辑 server.properties文件,找到并编辑log.dirs= D:\Tools\kafka_2.11-1.0.0\logs,找到并编辑zookeeper.connect=localhost:2181。表示本地运行。
(Kafka会按照默认,在9092端口上运行,并连接zookeeper的默认端口:2181)
4、启动服务,在dos服务下输入下面命令进行启动
D: oolskafka_2.12-2.1.0inwindowszookeeper-server-start.bat D: oolskafka_2.12-2.1.0configzookeeper.properties
首先启动,zookeeper服务,对应加载zookeeper的配置文件,kafka依赖zookeeper监控其状态
D: oolskafka_2.12-2.1.0inwindowskafka-server-start.bat D: oolskafka_2.12-2.1.0configserver.properties
然后启动kafka服务,对应加载相应配置文件
5、使用idea在git 上下载kafka-monitor 代码,地址:https://github.com/linxin26/kafka-monitor
运行Start,在浏览器输入http://127.0.0.1:5050
查看现有主题 topic
查看 broker
6、编写测试类查看相关状况
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import java.util.Arrays; import java.util.Collection; import java.util.Properties; /** * Created by songxiaofei on 2018-12-27. */ public class ConsumerMessage { public static void main(String[] args){ Properties props = new Properties(); props.put("bootstrap.servers", "10.1.9.3:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); final KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props); consumer.subscribe(Arrays.asList("buy"),new ConsumerRebalanceListener() { public void onPartitionsRevoked(Collection<TopicPartition> collection) { } public void onPartitionsAssigned(Collection<TopicPartition> collection) { //将偏移设置到最开始 consumer.seekToBeginning(collection); } }); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n,checksum=%s", record.offset(), record.key(), record.value(),record.checksum()); } } }
执行代码,可以查看相关日志,用来查看kafka的配置情况
D: oolsJavajdk1.8.0_31injava "-javaagent:D: oolsJetBrainsIntelliJ IDEA 2018.1libidea_rt.jar=64422:D: oolsJetBrainsIntelliJ IDEA 2018.1in" -Dfile.encoding=UTF-8 -classpath D: oolsJavajdk1.8.0_31jrelibcharsets.jar;D: oolsJavajdk1.8.0_31jrelibdeploy.jar;D: oolsJavajdk1.8.0_31jrelibextaccess-bridge-64.jar;D: oolsJavajdk1.8.0_31jrelibextcldrdata.jar;D: oolsJavajdk1.8.0_31jrelibextdnsns.jar;D: oolsJavajdk1.8.0_31jrelibextjaccess.jar;D: oolsJavajdk1.8.0_31jrelibextjfxrt.jar;D: oolsJavajdk1.8.0_31jrelibextlocaledata.jar;D: oolsJavajdk1.8.0_31jrelibext ashorn.jar;D: oolsJavajdk1.8.0_31jrelibextsunec.jar;D: oolsJavajdk1.8.0_31jrelibextsunjce_provider.jar;D: oolsJavajdk1.8.0_31jrelibextsunmscapi.jar;D: oolsJavajdk1.8.0_31jrelibextsunpkcs11.jar;D: oolsJavajdk1.8.0_31jrelibextzipfs.jar;D: oolsJavajdk1.8.0_31jrelibjavaws.jar;D: oolsJavajdk1.8.0_31jrelibjce.jar;D: oolsJavajdk1.8.0_31jrelibjfr.jar;D: oolsJavajdk1.8.0_31jrelibjfxswt.jar;D: oolsJavajdk1.8.0_31jrelibjsse.jar;D: oolsJavajdk1.8.0_31jrelibmanagement-agent.jar;D: oolsJavajdk1.8.0_31jrelibplugin.jar;D: oolsJavajdk1.8.0_31jrelib esources.jar;D: oolsJavajdk1.8.0_31jrelib t.jar;E:jetprojectkafka-monitor-master argetclasses;E:mavrepository.m2 epositoryorgapachekafkakafka-clients .10.0.1kafka-clients-0.10.0.1.jar;E:mavrepository.m2 epository etjpountzlz4lz41.3.0lz4-1.3.0.jar;E:mavrepository.m2 epositoryorgxerialsnappysnappy-java1.1.2.6snappy-java-1.1.2.6.jar;E:mavrepository.m2 epositoryorgapachecommonscommons-lang33.5commons-lang3-3.5.jar;E:mavrepository.m2 epositoryorgapachekafkakafka_2.10 .10.0.1kafka_2.10-0.10.0.1.jar;E:mavrepository.m2 epositorycom101teczkclient .8zkclient-0.8.jar;E:mavrepository.m2 epositoryorgscala-langscala-library2.10.6scala-library-2.10.6.jar;E:mavrepository.m2 epositorycomyammermetricsmetrics-core2.2.0metrics-core-2.2.0.jar;E:mavrepository.m2 epository etsfjopt-simplejopt-simple4.9jopt-simple-4.9.jar;E:mavrepository.m2 epositorycomalibabafastjson1.2.22fastjson-1.2.22.jar;E:mavrepository.m2 epositoryorgslf4jslf4j-api1.6.2slf4j-api-1.6.2.jar;E:mavrepository.m2 epositoryorgslf4jslf4j-log4j121.6.2slf4j-log4j12-1.6.2.jar;E:mavrepository.m2 epositorycommons-loggingcommons-logging-api1.1commons-logging-api-1.1.jar;E:mavrepository.m2 epositorylog4jlog4j1.2.16log4j-1.2.16.jar;E:mavrepository.m2 epositorymysqlmysql-connector-java5.1.40mysql-connector-java-5.1.40.jar;E:mavrepository.m2 epositoryorgapachecuratorcurator-framework2.10.0curator-framework-2.10.0.jar;E:mavrepository.m2 epositoryorgapachecuratorcurator-client2.10.0curator-client-2.10.0.jar;E:mavrepository.m2 epositorycomgoogleguavaguava16.0.1guava-16.0.1.jar;E:mavrepository.m2 epositoryorgapachezookeeperzookeeper3.4.8zookeeper-3.4.8.jar;E:mavrepository.m2 epositoryjlinejline .9.94jline-0.9.94.jar;E:mavrepository.m2 epositoryio etty etty3.7.0.Final etty-3.7.0.Final.jar;E:mavrepository.m2 epositoryorgapachecuratorcurator-recipes2.10.0curator-recipes-2.10.0.jar;E:mavrepository.m2 epositoryorgjolokiajolokia-jvm1.3.5jolokia-jvm-1.3.5.jar;E:mavrepository.m2 epositoryorgjolokiajolokia-core1.5.0jolokia-core-1.5.0.jar;E:mavrepository.m2 epositorycomgooglecodejson-simplejson-simple1.1.1json-simple-1.1.1.jar;D: oolsJavajdk1.8.0_31lib ools.jar;E:mavrepository.m2 epositoryorgprojectlomboklombok1.16.18lombok-1.16.18.jar;E:mavrepository.m2 epositoryorgspringframeworkootspring-boot-starter-web2.0.0.RELEASEspring-boot-starter-web-2.0.0.RELEASE.jar;E:mavrepository.m2 epositoryorgspringframeworkootspring-boot-starter2.0.0.RELEASEspring-boot-starter-2.0.0.RELEASE.jar;E:mavrepository.m2 epositoryorgspringframeworkootspring-boot2.0.0.RELEASEspring-boot-2.0.0.RELEASE.jar;E:mavrepository.m2 epositoryorgspringframeworkootspring-boot-autoconfigure2.0.0.RELEASEspring-boot-autoconfigure-2.0.0.RELEASE.jar;E:mavrepository.m2 epositoryorgspringframeworkootspring-boot-starter-logging2.0.0.RELEASEspring-boot-starter-logging-2.0.0.RELEASE.jar;E:mavrepository.m2 epositoryorgapachelogginglog4jlog4j-to-slf4j2.10.0log4j-to-slf4j-2.10.0.jar;E:mavrepository.m2 epositoryorgapachelogginglog4jlog4j-api2.10.0log4j-api-2.10.0.jar;E:mavrepository.m2 epositoryorgslf4jjul-to-slf4j1.7.25jul-to-slf4j-1.7.25.jar;E:mavrepository.m2 epositoryjavaxannotationjavax.annotation-api1.3.2javax.annotation-api-1.3.2.jar;E:mavrepository.m2 epositoryorgspringframeworkspring-core5.0.4.RELEASEspring-core-5.0.4.RELEASE.jar;E:mavrepository.m2 epositoryorgspringframeworkspring-jcl5.0.4.RELEASEspring-jcl-5.0.4.RELEASE.jar;E:mavrepository.m2 epositoryorgyamlsnakeyaml1.19snakeyaml-1.19.jar;E:mavrepository.m2 epositoryorgspringframeworkootspring-boot-starter-json2.0.0.RELEASEspring-boot-starter-json-2.0.0.RELEASE.jar;E:mavrepository.m2 epositorycomfasterxmljacksoncorejackson-databind2.9.4jackson-databind-2.9.4.jar;E:mavrepository.m2 epositorycomfasterxmljacksoncorejackson-annotations2.9.0jackson-annotations-2.9.0.jar;E:mavrepository.m2 epositorycomfasterxmljacksoncorejackson-core2.9.4jackson-core-2.9.4.jar;E:mavrepository.m2 epositorycomfasterxmljacksondatatypejackson-datatype-jdk82.9.4jackson-datatype-jdk8-2.9.4.jar;E:mavrepository.m2 epositorycomfasterxmljacksondatatypejackson-datatype-jsr3102.9.4jackson-datatype-jsr310-2.9.4.jar;E:mavrepository.m2 epositorycomfasterxmljacksonmodulejackson-module-parameter-names2.9.4jackson-module-parameter-names-2.9.4.jar;E:mavrepository.m2 epositoryorgspringframeworkootspring-boot-starter-tomcat2.0.0.RELEASEspring-boot-starter-tomcat-2.0.0.RELEASE.jar;E:mavrepository.m2 epositoryorgapache omcatembed omcat-embed-core8.5.28 omcat-embed-core-8.5.28.jar;E:mavrepository.m2 epositoryorgapache omcatembed omcat-embed-el8.5.28 omcat-embed-el-8.5.28.jar;E:mavrepository.m2 epositoryorgapache omcatembed omcat-embed-websocket8.5.28 omcat-embed-websocket-8.5.28.jar;E:mavrepository.m2 epositoryorghibernatevalidatorhibernate-validator6.0.7.Finalhibernate-validator-6.0.7.Final.jar;E:mavrepository.m2 epositoryjavaxvalidationvalidation-api2.0.1.Finalvalidation-api-2.0.1.Final.jar;E:mavrepository.m2 epositoryorgjbossloggingjboss-logging3.3.2.Finaljboss-logging-3.3.2.Final.jar;E:mavrepository.m2 epositorycomfasterxmlclassmate1.3.4classmate-1.3.4.jar;E:mavrepository.m2 epositoryorgspringframeworkspring-web5.0.4.RELEASEspring-web-5.0.4.RELEASE.jar;E:mavrepository.m2 epositoryorgspringframeworkspring-beans5.0.4.RELEASEspring-beans-5.0.4.RELEASE.jar;E:mavrepository.m2 epositoryorgspringframeworkspring-webmvc5.0.4.RELEASEspring-webmvc-5.0.4.RELEASE.jar;E:mavrepository.m2 epositoryorgspringframeworkspring-aop5.0.4.RELEASEspring-aop-5.0.4.RELEASE.jar;E:mavrepository.m2 epositoryorgspringframeworkspring-context5.0.4.RELEASEspring-context-5.0.4.RELEASE.jar;E:mavrepository.m2 epositoryorgspringframeworkspring-expression5.0.4.RELEASEspring-expression-5.0.4.RELEASE.jar co.solinx.kafka.monitor.ConsumerMessage log4j:WARN No such property [maxFileSize] in org.apache.log4j.DailyRollingFileAppender. log4j:WARN No such property [maxBackupIndex] in org.apache.log4j.DailyRollingFileAppender. 2019-01-16 15:56:36,600 [main] [org.apache.kafka.clients.consumer.ConsumerConfig]-[INFO] ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 300000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [10.1.9.3:9092] ssl.keystore.type = JKS enable.auto.commit = true sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = ssl.endpoint.identification.algorithm = null max.poll.records = 2147483647 check.crcs = true request.timeout.ms = 40000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 1000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer group.id = test retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 session.timeout.ms = 30000 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = latest
7、使用命令行创建topic
D: oolskafka_2.12-2.1.0inwindowskafka-topics.bat --create --zookeeper localhost:12181 --replication-factor 1 --partitions 1 --topic hello
查看已有的topic
D: oolskafka_2.12-2.1.0inwindowskafka-topics.bat --list --zookeeper 127.0.0.1:12181
创建一个消息生产者,同时创建一个消息消费者,去接收消息生产者发来的消息
D: oolskafka_2.12-2.1.0inwindowskafka-console-producer.bat --broker-list 127.0.0.1:9092 --topic hello 创建一个消息生产者
D: oolskafka_2.12-2.1.0inwindowskafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic hello --from-beginning 创建一个消息消费者