zoukankan      html  css  js  c++  java
  • [Kafka]

    Kafka提供了两种Consumer API,分别是:High Level Consumer API 和 Lower Level Consumer API(Simple Consumer API)

    High Level Consumer API:高度抽象的Kafka消费者API;将底层具体获取数据、更新offset、设置偏移量等操作屏蔽掉,直接将操作数据流的处理工作提供给编写程序的人员。优点是:操作简单;缺点:可操作性太差,无法按照自己的业务场景选择处理方式。(入口类:ConsumerConnector)

    Lower Level Consumer API:通过直接操作底层API获取数据的方式获取Kafka中的数据,需要自行给定分区、偏移量等属性。优点:可操作性强;缺点:代码相对而言比较复杂。(入口类:SimpleConsumer) 

    这里主要将High Level Consumer API使用Java代码实现并测试:

    Lower Level Consumer API详见博客:[Kafka] - Kafka Java Consumer实现(一)

    ========================================================================

    一、JavaKafkaConsumerHighAPI:使用Kafka High Level Consumer API多线程读取数据的相关API实现,具体代码如下:

    import kafka.consumer.*;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.message.MessageAndMetadata;
    import kafka.serializer.StringDecoder;
    import kafka.utils.VerifiableProperties;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 自定义简单Kafka消费者, 使用高级API
     * Created by gerry on 12/21.
     */
    public class JavaKafkaConsumerHighAPI implements Runnable {
        /**
         * Kafka数据消费对象
         */
        private ConsumerConnector consumer;
    
        /**
         * Kafka Topic名称
         */
        private String topic;
    
        /**
         * 线程数量,一般就是Topic的分区数量
         */
        private int numThreads;
    
        /**
         * 线程池
         */
        private ExecutorService executorPool;
    
        /**
         * 构造函数
         *
         * @param topic      Kafka消息Topic主题
         * @param numThreads 处理数据的线程数/可以理解为Topic的分区数
         * @param zookeeper  Kafka的Zookeeper连接字符串
         * @param groupId    该消费者所属group ID的值
         */
        public JavaKafkaConsumerHighAPI(String topic, int numThreads, String zookeeper, String groupId) {
            // 1. 创建Kafka连接器
            this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(zookeeper, groupId));
            // 2. 数据赋值
            this.topic = topic;
            this.numThreads = numThreads;
        }
    
        @Override
        public void run() {
            // 1. 指定Topic
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(this.topic, this.numThreads);
    
            // 2. 指定数据的解码器
            StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
            StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
    
            // 3. 获取连接数据的迭代器对象集合
            /**
             * Key: Topic主题
             * Value: 对应Topic的数据流读取器,大小是topicCountMap中指定的topic大小
             */
            Map<String, List<KafkaStream<String, String>>> consumerMap = this.consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
    
            // 4. 从返回结果中获取对应topic的数据流处理器
            List<KafkaStream<String, String>> streams = consumerMap.get(this.topic);
    
            // 5. 创建线程池
            this.executorPool = Executors.newFixedThreadPool(this.numThreads);
    
            // 6. 构建数据输出对象
            int threadNumber = 0;
            for (final KafkaStream<String, String> stream : streams) {
                this.executorPool.submit(new ConsumerKafkaStreamProcesser(stream, threadNumber));
                threadNumber++;
            }
        }
    
        public void shutdown() {
            // 1. 关闭和Kafka的连接,这样会导致stream.hashNext返回false
            if (this.consumer != null) {
                this.consumer.shutdown();
            }
    
            // 2. 关闭线程池,会等待线程的执行完成
            if (this.executorPool != null) {
                // 2.1 关闭线程池
                this.executorPool.shutdown();
    
                // 2.2. 等待关闭完成, 等待五秒
                try {
                    if (!this.executorPool.awaitTermination(5, TimeUnit.SECONDS)) {
                        System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly!!");
                    }
                } catch (InterruptedException e) {
                    System.out.println("Interrupted during shutdown, exiting uncleanly!!");
                }
            }
    
        }
    
        /**
         * 根据传入的zk的连接信息和groupID的值创建对应的ConsumerConfig对象
         *
         * @param zookeeper zk的连接信息,类似于:<br/>
         *                  hadoop-senior01.ibeifeng.com:2181,hadoop-senior02.ibeifeng.com:2181/kafka
         * @param groupId   该kafka consumer所属的group id的值, group id值一样的kafka consumer会进行负载均衡
         * @return Kafka连接信息
         */
        private ConsumerConfig createConsumerConfig(String zookeeper, String groupId) {
            // 1. 构建属性对象
            Properties prop = new Properties();
            // 2. 添加相关属性
            prop.put("group.id", groupId); // 指定分组id
            prop.put("zookeeper.connect", zookeeper); // 指定zk的连接url
            prop.put("zookeeper.session.timeout.ms", "400"); //
            prop.put("zookeeper.sync.time.ms", "200");
            prop.put("auto.commit.interval.ms", "1000");
            // 3. 构建ConsumerConfig对象
            return new ConsumerConfig(prop);
        }
    
    
        /**
         * Kafka消费者数据处理线程
         */
        public static class ConsumerKafkaStreamProcesser implements Runnable {
            // Kafka数据流
            private KafkaStream<String, String> stream;
            // 线程ID编号
            private int threadNumber;
    
            public ConsumerKafkaStreamProcesser(KafkaStream<String, String> stream, int threadNumber) {
                this.stream = stream;
                this.threadNumber = threadNumber;
            }
    
            @Override
            public void run() {
                // 1. 获取数据迭代器
                ConsumerIterator<String, String> iter = this.stream.iterator();
                // 2. 迭代输出数据
                while (iter.hasNext()) {
                    // 2.1 获取数据值
                    MessageAndMetadata value = iter.next();
    
                    // 2.2 输出
                    System.out.println(this.threadNumber + ":" + ":" + value.offset() + value.key() + ":" + value.message());
                }
                // 3. 表示当前线程执行完成
                System.out.println("Shutdown Thread:" + this.threadNumber);
            }
        }
    }

    二、JavaKafkaConsumerHighAPITest:测试类

    /**
     * Created by ibf on 12/21.
     */
    public class JavaKafkaConsumerHighAPITest {
        public static void main(String[] args) {
            String zookeeper = "192.168.187.146:2181";
            String groupId = "group1";
            String topic = "test2";
            int threads = 1;
    
            JavaKafkaConsumerHighAPI example = new JavaKafkaConsumerHighAPI(topic, threads, zookeeper, groupId);
            new Thread(example).start();
    
            // 执行10秒后结束
            int sleepMillis = 600000;
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 关闭
            example.shutdown();
        }
    }

    三、运行测试截图

    Kafka相关命令可以参考博客[Kafka] - Kafka基本操作命令, 测试截图如下:

    至此,开发基本完成

    ========================================================

    四、Kafka Pom文件依赖

    <properties>
        <kafka.version>0.8.2.1</kafka.version>
    </properties>
    
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>${kafka.version}</version>
        </dependency>
    </dependencies>
  • 相关阅读:
    async 和 await
    Nginx配置反向代理与负载均衡
    简单使用高德地图开放平台API
    layui select 动态赋值
    ERROR: Pool overlaps with other one on this address space
    解决docker镜像无法删除的问题
    伪静态问题导致前台页面无法通过地址栏访问
    ERROR: Failed to Setup IP tables: Unable to enable SKIP DNAT rule
    PHP使用引用实现无限极分类
    composer update -- memory_limit
  • 原文地址:https://www.cnblogs.com/liuming1992/p/6432626.html
Copyright © 2011-2022 走看看