zoukankan      html  css  js  c++  java
  • 消息中间件kafka学习记录

    1. 概述

    Apache Kafka是一个分布式消息系统,凭借其优异的特性而被广泛使用。

    • 高性能:O(1)复杂度消息快速持久化。
    • 高吞吐率: 单机每秒10w条消息传输。
    • 支持消息分区和分布式消费。
    • 支持在线水平扩展。
    架构及核心组件

    • Producer: 消息生产者,即向kafka broker发送消息的客户端。
    • Consumer:消息消费者,即从kafka broker获取消息的客户端。
    • Topic:消息根据topic进行归类。
    • Partition:消息分片,每个topic中的消息被分为n个独立的partition,以提高消息处理效率。
    • Broker:kafka集群中的kafka实例(服务器节点),一个broker可以容纳多个topic。

    Kafka依赖于zookeeper保存一些meta信息,来保证系统可用性。

    2. 环境准备

    2.1 安装zookeeper

    zk官网下载安装包,如zookeeper-3.4.13.tar.gz,解压即可。

    • 启动服务,默认监听端口2181
    ./bin/zkServer.sh start
    
    • 客户端连接
    ./bin/zkCli.sh -server 10.183.222.203:2181
    

    2.2 安装kafka

    kafka官网下载安装包,如kafka_2.12-2.0.0.tgz,解压即可。
    /config/server.properties配置:
    log.dirs=/tmp/kafka-logs
    listeners=PLAINTEXT://10.183.222.203:9092
    zookeeper.connect=10.183.222.203:2181 #连接zookeeper

    • 启动服务,默认监听端口9092
    ./bin/kafka-server-start.sh  config/server.properties  &
    

    启动kafka后,zk增加了brokers、consumers等节点:
    启动kafka后,查看zk节点

    3. 命令行常用命令

    创建topic (-create)
    # 创建topic:test01,并指定了replication-factor和partitions分别为1。
    # replication-factor控制一个Message会被写到多少台服务器上,因此这个值必须≤Broker数量。
    ./bin/kafka-topics.sh -create -zookeeper  10.183.222.203:2181 -replication-factor 1 -partitions 1 -topic test01
    

    创建topic

    查看topic详情 (-describe)
    ./bin/kafka-topics.sh -describe -zookeeper 10.183.222.203:2181
    

    查看topic详情

    发送消息到指定topic
    ./bin/kafka-console-producer.sh --broker-list 10.183.222.203:9092 -topic test01
    

    发送消息

    消费指定topic上的消息
    ./bin/kafka-console-consumer.sh -bootstrap-server 10.183.222.203:9092 -topic test01 -from-beginning
    

    接收消息

    4. java api实现

    4.1 添加maven配置
            <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.0.0</version>
            </dependency>
    
    4.2 消息生产者
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    
    /**
     * 消息生产者
     */
    public class ProducerDemo {
        private static final String helloTopic = "HelloWorld";
    
        public static void main(String[] args) {
            // 1. 构造Propertity,进行producer 相关配置。
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.183.222.203:9092");
            properties.put(ProducerConfig.ACKS_CONFIG, "all");
            properties.put(ProducerConfig.RETRIES_CONFIG, 0);
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
            properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 33554432);
            // 消息序列化方式
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            Producer<String, String> producer = null;
            try {
                // 2. 构造Producer对象
                producer = new KafkaProducer<>(properties);
                for (int i = 0; i < 10; i++) {
                    String msgValue = "Message " + i;
                    // 3. 发送消息
                    producer.send(new ProducerRecord<>(helloTopic, msgValue));
                    System.out.println("Sent:" + msg);
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (producer != null) {
                    producer.close();
                }
            }
        }
    }
    
    4.3 消息消费者
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import java.util.Collections;
    import java.util.Properties;
    
    /**
     *  consumer从kafka读取消息
     */
    public class ConsumerDemo {
        private static final String helloTopic = "HelloWorld";
    
        public static void main(String[] args) {
            // 1. 构造Propertity,进行consumer相关配置。
            Properties properties = new Properties();
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.183.222.203:9092");
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
            // 消息反序列化方式
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
           
            // 2. 生成消费实例
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
            // 3. 订阅相应的 topic
            //    说明:可以消费多个topic: Arrays.asList(topic);
            //          topic支持正则表达式:如:subscribe(Pattern.compile("test.*")
            consumer.subscribe(Collections.singleton(helloTopic));
            
            // 4. 循环消费消息
            while (true) {
                try {
                    // 4.1 poll方法拉取订阅的消息, 消费者必须不断的执行poll,获取消息、维持连接。
                    ConsumerRecords<String, String> records = consumer.poll(1000);
                    // 4.2 消费数据,必须在下次poll之前消费完这些数据, 且总耗时不得超过SESSION_TIMEOUT_MS_CONFIG
                    //     若不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿。
                    //     可以开一个单独的线程池来消费消息,然后异步返回结果。
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.printf("topic = %s, partition = %d, offset = %d, key = %s, value = %s
    ",
                                record.topic(), record.partition(), record.offset(), record.key(), record.value());
                    }
    
                } finally {
                    // 不再消费主动关闭
                    consumer.close();
                }
            }
        }
    }
    
    

    消息消费者

    kafka作为目前广泛使用的消息中间件。本文对其核心组件和基本用法做了学习记录。

    参考:Kafka: The Definitive Guide

  • 相关阅读:
    java中 this和super的差别
    Servlet对文件的读写操作
    Android通过反射打造能够存储不论什么对象的万能SharedPreferences
    Solr5.3.1 SolrJ查询索引结果
    spring mvc form表单提交乱码
    多表利用DIH批量导入数据并建立索引注意事项
    【转】Solr从数据库导入数据(DIH)
    【转】solr+ajax智能拼音详解---solr跨域请求
    跨域请求获取Solr json检索结果并高亮显示
    Solr5.3.1通过copyField设置多个field(字段)同时检索
  • 原文地址:https://www.cnblogs.com/eaglediao/p/9621534.html
Copyright © 2011-2022 走看看