zoukankan      html  css  js  c++  java
  • kafka---->kafka的使用(一)

      今天我们来学习一下kafka的简单的使用与配置。世上有可以挽回的和不可挽回的事,而时间经过就是一种不可挽回的事。

    kafka的安装配置

    一、kafka的使用场景

    活动跟踪:网站用户与前端应用程序发生交互,前端应用程序生成用户活动相关的消息

    传递消息:应用程序向用户发送通知就是通过传递消息来实现

    度量指标和日志记录:应用程序定期把度量指标或者日志消息发布到kafka主题上,可读被监控或者被专门的日志搜索系统(elasticsearch)分析。

    提交日志:可以把数据库的更新发布到kafka上,应用程序通过监控事件流来接收数据库的实时更新

    流处理:与hadoop里的map和reduce类似,只不过它操作的是实时数据流

    二、为什么选择kafka

    多个生产者:用来从多个前端系统收集数据,并以统一的格式对外提供数据

    多个消费者:多个消费者从一个单独的消息流上读取数据,而且消费者之间互不影响

    基于磁盘的数据存储:消息被提交到磁盘,根据设置的保留规则进行保存

    伸缩性:对在线集群进行扩展丝毫不影响整体系统的可用性

    高性能:在处理大量数据的同时,它还能保证亚秒级别的消息延迟

    三、kafka的一些概念

    消息与批次:kafka的数据单元被称为消息,它由字节数组组成。批次就是一组消息,这些消息属于同一个主题和分区

    模式:像json或者xml消息模式缺乏强类型处理能力。可以使用Avro来消除消息读写操作之间的耦合性

    主题与分区:kafka的消息通过主题进行分类,主题就好比数据库的表。主题可以被分成若干个分区,一个分区就是一个提交日志

    生产者与消费者:生产者创建消息,消息者读取消息

    broker与集群:一个独立的kafka服务器被称为broker,它接收来自于生产者的消息而且为消费者提供服务

    四、kafka的安装与配置

    这里的安装以及案例都是基于window上的。kafka的运行需要java环境和zookeeper的启动。kafka使用zookeeper保存集群的元数据信息和消费者信息。

    kafka的运行需要java环境,java的下载地址:http://www.oracle.com/technetwork/java/javase/downloads/index.html

    zookeeper的安装,下载地址:http://zookeeper.apache.org/releases.html。解压即可使用。

    具体的可以参考这篇文章:https://www.w3cschool.cn/apache_kafka/apache_kafka_installation_steps.html

    kafka的java使用

    现在我们通过java编写生产者与消费者来演示一下kafka的过程。我们的安装目录如下:

    这里面的data目录是我们手动创建的,用地记录产生的日志文件。另外需要修改config下面的server.properties文件。修改如下

    log.dirs=D:/Apache/apache-kafka/kafka_2.11-0.11.0.1/data

    一、启动kafka的broker以及创建topic

    新建窗口,切换目录:cd  D:Apacheapache-kafkakafka_2.11-0.11.0.1inwindows。

    运行zookeeper-server-start ../../config/zookeeper.properties启动zookeeper。打印日志

    [2017-11-03 23:48:54,899] INFO Reading configuration from: ....configzookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
    [2017-11-03 23:48:54,925] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
    [2017-11-03 23:48:54,927] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
    [2017-11-03 23:48:54,928] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
    [2017-11-03 23:48:54,930] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
    [2017-11-03 23:48:54,970] INFO Reading configuration from: ....configzookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
    [2017-11-03 23:48:54,973] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
    [2017-11-03 23:48:55,005] INFO Server environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT (org.apache.zookeeper.server.ZooKeeperServer)
    [2017-11-03 23:48:55,009] INFO Server environment:host.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)
    [2017-11-03 23:48:55,012] INFO Server environment:java.version=1.8.0_152 (org.apache.zookeeper.server.ZooKeeperServer)
    [2017-11-03 23:48:55,019] INFO Server environment:java.vendor=Oracle Corporation (org.apache.zookeeper.server.ZooKeeperServer)
    [2017-11-03 23:48:55,023] INFO Server environment:java.home=D:Javajdkjdk1.8.0_152jre (org.apache.zookeeper.server.ZooKeeperServer)
    ........
    View Code

    新建窗口,切换目录: cd  D:Apacheapache-kafkakafka_2.11-0.11.0.1inwindows。

    运行kafka-server-start.bat ../../config/server.properties启动kafka。打印日志

    [2017-11-03 23:54:16,362] INFO KafkaConfig values:
            advertised.host.name = null
            advertised.listeners = null
            advertised.port = null
            alter.config.policy.class.name = null
            authorizer.class.name =
            auto.create.topics.enable = true
            auto.leader.rebalance.enable = true
            background.threads = 10
            broker.id = 0
            broker.id.generation.enable = true
            broker.rack = null
            compression.type = producer
            connections.max.idle.ms = 600000
            controlled.shutdown.enable = true
            controlled.shutdown.max.retries = 3
            controlled.shutdown.retry.backoff.ms = 5000
            controller.socket.timeout.ms = 30000
            create.topic.policy.class.name = null
            default.replication.factor = 1
            delete.records.purgatory.purge.interval.requests = 1
            delete.topic.enable = false
            fetch.purgatory.purge.interval.requests = 1000
            group.initial.rebalance.delay.ms = 0
            group.max.session.timeout.ms = 300000
            group.min.session.timeout.ms = 6000
            host.name =
            inter.broker.listener.name = null
            inter.broker.protocol.version = 0.11.0-IV2
            leader.imbalance.check.interval.seconds = 300
            leader.imbalance.per.broker.percentage = 10
            listener.security.protocol.map = SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,TRACE:TRACE,SASL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT
            listeners = null
            log.cleaner.backoff.ms = 15000
            log.cleaner.dedupe.buffer.size = 134217728
            log.cleaner.delete.retention.ms = 86400000
            log.cleaner.enable = true
            log.cleaner.io.buffer.load.factor = 0.9
            log.cleaner.io.buffer.size = 524288
            log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
            log.cleaner.min.cleanable.ratio = 0.5
            log.cleaner.min.compaction.lag.ms = 0
            log.cleaner.threads = 1
            log.cleanup.policy = [delete]
            log.dir = /tmp/kafka-logs
            log.dirs = D:/Apache/apache-kafka/kafka_2.11-0.11.0.1/data
            log.flush.interval.messages = 9223372036854775807
            log.flush.interval.ms = null
            log.flush.offset.checkpoint.interval.ms = 60000
            log.flush.scheduler.interval.ms = 9223372036854775807
            log.flush.start.offset.checkpoint.interval.ms = 60000
            log.index.interval.bytes = 4096
            log.index.size.max.bytes = 10485760
            log.message.format.version = 0.11.0-IV2
            log.message.timestamp.difference.max.ms = 9223372036854775807
            log.message.timestamp.type = CreateTime
            log.preallocate = false
            log.retention.bytes = -1
            log.retention.check.interval.ms = 300000
            log.retention.hours = 168
            log.retention.minutes = null
            log.retention.ms = null
            log.roll.hours = 168
            log.roll.jitter.hours = 0
            log.roll.jitter.ms = null
            log.roll.ms = null
            log.segment.bytes = 1073741824
            log.segment.delete.delay.ms = 60000
            max.connections.per.ip = 2147483647
            max.connections.per.ip.overrides =
            message.max.bytes = 1000012
            metric.reporters = []
            metrics.num.samples = 2
            metrics.recording.level = INFO
            metrics.sample.window.ms = 30000
            min.insync.replicas = 1
            num.io.threads = 8
            num.network.threads = 3
            num.partitions = 1
            num.recovery.threads.per.data.dir = 1
            num.replica.fetchers = 1
            offset.metadata.max.bytes = 4096
            offsets.commit.required.acks = -1
            offsets.commit.timeout.ms = 5000
            offsets.load.buffer.size = 5242880
            offsets.retention.check.interval.ms = 600000
            offsets.retention.minutes = 1440
            offsets.topic.compression.codec = 0
            offsets.topic.num.partitions = 50
            offsets.topic.replication.factor = 1
            offsets.topic.segment.bytes = 104857600
            port = 9092
            principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
            producer.purgatory.purge.interval.requests = 1000
            queued.max.requests = 500
            quota.consumer.default = 9223372036854775807
            quota.producer.default = 9223372036854775807
            quota.window.num = 11
            quota.window.size.seconds = 1
            replica.fetch.backoff.ms = 1000
            replica.fetch.max.bytes = 1048576
            replica.fetch.min.bytes = 1
            replica.fetch.response.max.bytes = 10485760
            replica.fetch.wait.max.ms = 500
            replica.high.watermark.checkpoint.interval.ms = 5000
            replica.lag.time.max.ms = 10000
            replica.socket.receive.buffer.bytes = 65536
            replica.socket.timeout.ms = 30000
            replication.quota.window.num = 11
            replication.quota.window.size.seconds = 1
            request.timeout.ms = 30000
            reserved.broker.max.id = 1000
            sasl.enabled.mechanisms = [GSSAPI]
            sasl.kerberos.kinit.cmd = /usr/bin/kinit
            sasl.kerberos.min.time.before.relogin = 60000
            sasl.kerberos.principal.to.local.rules = [DEFAULT]
            sasl.kerberos.service.name = null
            sasl.kerberos.ticket.renew.jitter = 0.05
            sasl.kerberos.ticket.renew.window.factor = 0.8
            sasl.mechanism.inter.broker.protocol = GSSAPI
            security.inter.broker.protocol = PLAINTEXT
            socket.receive.buffer.bytes = 102400
            socket.request.max.bytes = 104857600
            socket.send.buffer.bytes = 102400
            ssl.cipher.suites = null
            ssl.client.auth = none
            ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
            ssl.endpoint.identification.algorithm = null
            ssl.key.password = null
            ssl.keymanager.algorithm = SunX509
            ssl.keystore.location = null
            ssl.keystore.password = null
            ssl.keystore.type = JKS
            ssl.protocol = TLS
            ssl.provider = null
            ssl.secure.random.implementation = null
            ssl.trustmanager.algorithm = PKIX
            ssl.truststore.location = null
            ssl.truststore.password = null
            ssl.truststore.type = JKS
            transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000
            transaction.max.timeout.ms = 900000
            transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
            transaction.state.log.load.buffer.size = 5242880
            transaction.state.log.min.isr = 1
            transaction.state.log.num.partitions = 50
            transaction.state.log.replication.factor = 1
            transaction.state.log.segment.bytes = 104857600
            transactional.id.expiration.ms = 604800000
            unclean.leader.election.enable = false
            zookeeper.connect = localhost:2181
            zookeeper.connection.timeout.ms = 6000
            zookeeper.session.timeout.ms = 6000
            zookeeper.set.acl = false
            zookeeper.sync.time.ms = 2000
     (kafka.server.KafkaConfig)
    [2017-11-03 23:54:16,480] INFO starting (kafka.server.KafkaServer)
    [2017-11-03 23:54:16,483] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
    [2017-11-03 23:54:16,497] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
    .......
    View Code

    新建窗口,切换目录: cd  D:Apacheapache-kafkakafka_2.11-0.11.0.1inwindows。

    运行kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test创建topc,名称为test。

    Created topic "test".

    二、编写我们的java代码

    我们使用的maven依赖如下:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.11.0.1</version>
    </dependency>
    • 消息的发布者:发布10次消息,从0到9。
    package com.linux.huhx.firstdemo;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    
    /**
     * @Author: huhx
     * @Date: 2017-11-03 下午 4:41
     */
    public class HelloProducer {
    
        public static void main(String[] args) {
            String topicName = "test";
            Properties props = new Properties();
            //Assign localhost id
            props.put("bootstrap.servers", "localhost:9092");
            //Set acknowledgements for producer requests.
            props.put("acks", "all");
            //If the request fails, the producer can automatically retry,
            props.put("retries", 0);
            //Specify buffer size in config
            props.put("batch.size", 16384);
            //Reduce the no of requests less than 0
            props.put("linger.ms", 1);
            //The buffer.memory controls the total amount of memory available to the producer for buffering.
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            Producer<String, String> producer = new KafkaProducer<>(props);
    
            for (int i = 0; i < 10; i++) {
                producer.send(new ProducerRecord<>(topicName, Integer.toString(i), Integer.toString(i)));
            }
            System.out.println("Message sent successfully");
            producer.close();
        }
    }
    • 消息的消费者
    package com.linux.huhx.firstdemo;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    /**
     * @Author: huhx
     * @Date: 2017-11-03 下午 5:52
     */
    public class HelloConsumer {
    
        public static void main(String[] args) {
            String topicName = "test";
            Properties props = new Properties();
    
            props.put("bootstrap.servers", "localhost:9092");
            props.put("group.id", "test");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
            consumer.subscribe(Arrays.asList(topicName));
            System.out.println("Subscribed to topic " + topicName);
            int i = 0;
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records)
                    System.out.printf("offset = %d, key = %s, value = %s
    ", record.offset(), record.key(), record.value());
            }
        }
    }

    运行main函数:(HelloProducer --> HelloConsumer --> HelloProducer)。整个过程HelloProducer发布了20条消息,HelloConsumer只接受到后来的10条消息。HelloConsumer的打印日志如下:

    offset = 10, key = 0, value = 0
    offset = 11, key = 1, value = 1
    offset = 12, key = 2, value = 2
    offset = 13, key = 3, value = 3
    offset = 14, key = 4, value = 4
    offset = 15, key = 5, value = 5
    offset = 16, key = 6, value = 6
    offset = 17, key = 7, value = 7
    offset = 18, key = 8, value = 8
    offset = 19, key = 9, value = 9

    原因是topic是基于订阅发布的,不是基于队列的。

    三、解决远程java生产者向kafka发送消息

    修改kakfa/config下面的server.properties文件,添加以下内容:

    # ip是运行kafka的主机
    advertised.host.name=192.168.1.101 

    重新启动zookeeper和kafka,就可以在kafka接受远程producer的消息。

    友情链接

  • 相关阅读:
    使用svn diff的-r参数的来比较任意两个版本的差异
    mysql client常见error总结
    mysql 中 unix_timestamp,from_unixtime 时间戳函数
    hyperledger explorer 结合 fabric1.4 搭建 区块链浏览器 踩坑记录
    fabric1.4 网络操作
    通过配置连接池大小来提升性能
    docker基本操作及介绍
    InnoDB 引擎中的索引类型
    MySQL互联网业务使用建议
    mysql InnoDB引擎是否支持hash索引
  • 原文地址:https://www.cnblogs.com/huhx/p/baseusekafkalearn1.html
Copyright © 2011-2022 走看看