zoukankan      html  css  js  c++  java
  • Kafka实战-简单示例

    https://www.cnblogs.com/smartloli/p/4543211.html

    1.概述

      上一篇博客《Kafka实战-Kafka Cluster》中,为大家介绍了Kafka集群的安装部署,以及对Kafka集群Producer/Consumer、HA等做了相关测试,今天我们来开发一个Kafka示例,练习如何在Kafka中进行编程,下面是今天的分享的目录结构:

    • 开发环境
    • ConfigureAPI
    • Consumer
    • Producer
    • 截图预览

      下面开始今天的内容分享。

    2.开发环境

      在开发Kafka相关应用之前,我们得将Kafka得开发环境搭建完成,这里我所使用得开发环境如下所示:

    基础软件 工具名称
    IDE JBoss Studio 8
    JDK 1.7

      关于基础软件的下载及相关配置,大家可参考我写的《高可用Hadoop平台-启航》一文的相关赘述,这里就不多做介绍了。在安装好相关基础软件后,我们开始项目工程的创建,这里我们所使用的工程结构是Maven,关于Maven环境的相关配置信息,可参考我在《Hadoop2源码分析-准备篇》一文对Maven环境配置的赘述。

      在准备完成相关基础软件以及Maven环境后,我们大家创建的工程,在pom.xml文件中,添加Kafka的依赖包,添加代码如下所示:

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.11</artifactId>
                <version>0.8.2.1</version>
            </dependency>        

      下面开始编写今天的代码示例。

    3.ConfigureAPI

      首先是一个配置结构类文件,配置Kafka的相关参数,代码如下所示:

    复制代码
    package cn.hadoop.hdfs.conf;
    
    /**
     * @Date Apr 28, 2015
     *
     * @Author dengjie
     *
     * @Note Set param path
     */
    public class ConfigureAPI {
    
        public interface KafkaProperties {
            public final static String ZK = "10.211.55.15:2181,10.211.55.17:2181,10.211.55.18:2181";
            public final static String GROUP_ID = "test_group1";
            public final static String TOPIC = "test2";
            public final static String BROKER_LIST = "10.211.55.15:9092,10.211.55.17:9092,10.211.55.18:9092";
            public final static int BUFFER_SIZE = 64 * 1024;
            public final static int TIMEOUT = 20000;
            public final static int INTERVAL = 10000;
        }
    
    }
    复制代码

    4.Consumer

      然后是一个消费程序,用于消费Kafka的消息,代码如下所示:

    • JConsumer

    复制代码
    package cn.hadoop.hdfs.kafka;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    import cn.hadoop.hdfs.conf.ConfigureAPI.KafkaProperties;
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    
    /**
     * @Date May 22, 2015
     *
     * @Author dengjie
     *
     * @Note Kafka Consumer
     */
    public class JConsumer extends Thread {
    
        private ConsumerConnector consumer;
        private String topic;
        private final int SLEEP = 1000 * 3;
    
        public JConsumer(String topic) {
            consumer = Consumer.createJavaConsumerConnector(this.consumerConfig());
            this.topic = topic;
        }
    
        private ConsumerConfig consumerConfig() {
            Properties props = new Properties();
            props.put("zookeeper.connect", KafkaProperties.ZK);
            props.put("group.id", KafkaProperties.GROUP_ID);
            props.put("zookeeper.session.timeout.ms", "40000");
            props.put("zookeeper.sync.time.ms", "200");
            props.put("auto.commit.interval.ms", "1000");
            return new ConsumerConfig(props);
        }
    
        @Override
        public void run() {
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(topic, new Integer(1));
            Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
            KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            while (it.hasNext()) {
                System.out.println("Receive->[" + new String(it.next().message()) + "]");
                try {
                    sleep(SLEEP);
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    
    }
    复制代码

    5.Producer

      接着是Kafka的生产消息程序,用于产生Kafka的消息供Consumer去消费,具体代码如下所示:

    • JProducer

    复制代码
    package cn.hadoop.hdfs.kafka;
    
    import java.util.Properties;
    
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    
    /**
     * @Date May 22, 2015
     *
     * @Author dengjie
     *
     * @Note Kafka JProducer
     */
    public class JProducer extends Thread {
    
        private Producer<Integer, String> producer;
        private String topic;
        private Properties props = new Properties();
        private final int SLEEP = 1000 * 3;
    
        public JProducer(String topic) {
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            props.put("metadata.broker.list", "10.211.55.18:9092");
            producer = new Producer<Integer, String>(new ProducerConfig(props));
            this.topic = topic;
        }
    
        @Override
        public void run() {
            int offsetNo = 1;
            while (true) {
                String msg = new String("Message_" + offsetNo);
                System.out.println("Send->[" + msg + "]");
                producer.send(new KeyedMessage<Integer, String>(topic, msg));
                offsetNo++;
                try {
                    sleep(SLEEP);
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    
    }
    复制代码

    6.截图预览

      在开发完Consumer和Producer的代码后,我们来测试相关应用,下面给大家编写了一个Client去测试Consumer和Producer,具体代码如下所示:

    • KafkaClient

    复制代码
    package cn.hadoop.hdfs.kafka.client;
    
    import cn.hadoop.hdfs.conf.ConfigureAPI.KafkaProperties;
    import cn.hadoop.hdfs.kafka.JConsumer;
    import cn.hadoop.hdfs.kafka.JProducer;
    
    /**
     * @Date May 22, 2015
     *
     * @Author dengjie
     *
     * @Note To run Kafka Code
     */
    public class KafkaClient {
    
        public static void main(String[] args) {
            JProducer pro = new JProducer(KafkaProperties.TOPIC);
            pro.start();
    
            JConsumer con = new JConsumer(KafkaProperties.TOPIC);
            con.start();
        }
    
    }
    复制代码

      运行截图如下所示:

    7.总结

      大家在开发Kafka的应用时,需要注意相关事项。若是使用Maven项目工程,在添加相关Kafka依赖JAR包时,有可能依赖JAR会下载失败,若出现这种情况,可手动将Kafka的依赖JAR包添加到Maven仓库即可,在编写Consumer和Producer程序,这里只是给出一个示例让大家先熟悉Kafka的代码如何去编写,后面会给大家更加详细复杂的代码模块案例。

    8.结束语

      这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

  • 相关阅读:
    使用delphi 开发多层应用(十)安全访问服务器
    使用delphi 开发多层应用(十三)使用Basic4android 直接访问kbmMW server
    使用delphi 开发多层应用(十一)使用kbmMW 开发webserver
    basic4android 开发教程翻译(八)使用ListView
    使用delphi 连接国产数据库:达梦
    kbmMW 4.01.00 Beta 1发布了
    解决 ie 下 javascript 设置 table.tBodies.innerHTML 无法设置的问题
    用vs2010 把 vcf 转成 csv,再转入Nokia
    GAC 与 引用 程序集路径
    sqlserver 函数 之 进制转换
  • 原文地址:https://www.cnblogs.com/xiaohanlin/p/8640876.html
Copyright © 2011-2022 走看看