zoukankan      html  css  js  c++  java
  • Kafka详解三:开发Kafka应用

    问题导读

    1.Kafka系统由什么组成?
    2.Kafka中和producer相关的API是什么?

    一、整体看一下Kafka
            我们知道,Kafka系统有三大组件:Producer、Consumer、broker 。
           
           file:///C:/Users/ADMINI~1/AppData/Local/Temp/enhtmlclip/Image.png
           producers 生产(produce)消息(message)并推(push)送给brokers,consumers从brokers把消息提取(pull)出来消费(consume)。

    二、开发一个Producer应用    
            Producers用来生产消息并把产生的消息推送到Kafka的Broker。Producers可以是各种应用,比如web应用,服务器端应用,代理应用以及log系统等等。当然,Producers现在有各种语言的实现比如Java、C、Python等。
           我们先看一下Producer在Kafka中的角色:

           

          file:///C:/Users/ADMINI~1/AppData/Local/Temp/enhtmlclip/Image(1).png

    2.1.kafka Producer 的 API
     Kafka中和producer相关的API有三个类

    • Producer:最主要的类,用来创建和推送消息
    • KeyedMessage:定义要发送的消息对象,比如定义发送给哪个topic,partition key和发送的内容等。
    • ProducerConfig:配置Producer,比如定义要连接的brokers、partition class、serializer class、partition key等



    2.2.下面我们就写一个最简单的Producer:产生一条消息并推送给broker

    1. package bonree.producer;
    2. import java.util.Properties;
    3. import kafka.javaapi.producer.Producer;
    4. import kafka.producer.KeyedMessage;
    5. import kafka.producer.ProducerConfig;
    6. /*******************************************************************************
    7. * BidPlanStructForm.java Created on 2014-7-8
    8. * Author: <a href=mailto:wanghouda@126.com>houda</a>
    9. * @Title: SimpleProducer.java
    10. * @Package bonree.producer
    11. * Description:
    12. * Version: 1.0
    13. ******************************************************************************/
    14. public class SimpleProducer {
    15.         private static Producer<Integer,String> producer;
    16.         private final Properties props=new Properties();
    17.         public SimpleProducer(){
    18.                 //定义连接的broker list
    19.                 props.put("metadata.broker.list", "192.168.4.31:9092");
    20.                 //定义序列化类(Java对象传输前要序列化)
    21.                 props.put("serializer.class", "kafka.serializer.StringEncoder");
    22.                 producer = new Producer<Integer, String>(new ProducerConfig(props));
    23.         }
    24.         public static void main(String[] args) {
    25.                 SimpleProducer sp=new SimpleProducer();
    26.                 //定义topic
    27.                 String topic="mytopic";
    28.                 //定义要发送给topic的消息
    29.                 String messageStr = "send a message to broker ";
    30.                 //构建消息对象
    31.                 KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, messageStr);
    32.                 //推送消息到broker
    33.                 producer.send(data);
    34.                 producer.close();
    35.         }
    36. }


    三、开发一个consumer应用
           Consumer是用来消费Producer产生的消息的,当然一个Consumer可以是各种应用,如可以是一个实时的分析系统,也可以是一个数据仓库或者是一个基于发布订阅模式的解决方案等。Consumer端同样有多种语言的实现,如Java、C、Python等。
          我们看一下Consumer在Kafka中的角色:
           
    3.1.kafka Producer 的 API
    Kafka和Producer稍微有些不同,它提供了两种类型的API

    • high-level consumer API:提供了对底层API的抽象,使用起来比较简单
    • simple consumer API:允许重写底层API的实现,提供了更多的控制权,当然使用起来也复杂一些

           由于是第一个应用,我们这部分使用high-level API,它的特点每消费一个message自动移动offset值到下一个message,关于offset在后面的部分会单独介绍。与Producer类似,和Consumer相关的有三个主要的类:

    • KafkaStream:这里面返回的就是Producer生产的消息
    • ConsumerConfig:定义要连接zookeeper的一些配置信息(Kafka通过zookeeper均衡压力,具体请查阅见面几篇文章),比如定义zookeeper的URL、group id、连接zookeeper过期时间等。
    • ConsumerConnector:负责和zookeeper进行连接等工作

    3.2.下面我们就写一个最简单的Consumer:从broker中消费一个消息

    1. package bonree.consumer;
    2. import java.util.HashMap;
    3. import java.util.List;
    4. import java.util.Map;
    5. import java.util.Properties;
    6. import kafka.consumer.Consumer;
    7. import kafka.consumer.ConsumerConfig;
    8. import kafka.consumer.ConsumerIterator;
    9. import kafka.consumer.KafkaStream;
    10. import kafka.javaapi.consumer.ConsumerConnector;
    11. /*******************************************************************************
    12. * Created on 2014-7-8 Author: <a
    13. * href=mailto:wanghouda@126.com>houda</a>
    14. * @Title: SimpleHLConsumer.java
    15. * @Package bonree.consumer Description: Version: 1.0
    16. ******************************************************************************/
    17. public class SimpleHLConsumer {
    18.         private final ConsumerConnector consumer;
    19.         private final String topic;
    20.         public SimpleHLConsumer(String zookeeper, String groupId, String topic) {
    21.                 Properties props = new Properties();
    22.                 //定义连接zookeeper信息
    23.                 props.put("zookeeper.connect", zookeeper);
    24.                 //定义Consumer所有的groupID,关于groupID,后面会继续介绍
    25.                 props.put("group.id", groupId);
    26.                 props.put("zookeeper.session.timeout.ms", "500");
    27.                 props.put("zookeeper.sync.time.ms", "250");
    28.                 props.put("auto.commit.interval.ms", "1000");
    29.                 consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
    30.                 this.topic = topic;
    31.         }
    32.         public void testConsumer() {
    33.                 Map<String, Integer> topicCount = new HashMap<String, Integer>();
    34.                 //定义订阅topic数量
    35.                 topicCount.put(topic, new Integer(1));
    36.                 //返回的是所有topic的Map
    37.                 Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
    38.                 //取出我们要需要的topic中的消息流
    39.                 List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
    40.                 for (final KafkaStream stream : streams) {
    41.                         ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
    42.                         while (consumerIte.hasNext())
    43.                                 System.out.println("Message from Single Topic :: " + new String(consumerIte.next().message()));
    44.                 }
    45.                 if (consumer != null)
    46.                         consumer.shutdown();
    47.         }
    48.         public static void main(String[] args) {
    49.                 String topic = "mytopic";
    50.                 SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("192.168.4.32:2181", "testgroup", topic);
    51.                 simpleHLConsumer.testConsumer();
    52.         }
    53. }


    四、运行查看结果
    先启动服务器端相关进程:

    • 运行zookeeper:[root@localhost kafka-0.8]# bin/zookeeper-server-start.sh config/zookeeper.properties
    • 运行Kafkabroker:[root@localhost kafka-0.8]# bin/kafka-server-start.sh config/server.properties

    再运行我们写的应用

    • 运行刚才写的SimpleHLConsumer 类的main函数,等待生产者生产消息
    • 运行SimpleProducer的main函数,生产消息并push到broker

          结果:运行完SimpleProducer后在SimpleHLConsumer的控制台即可看到生产者生产的消息:“send a message to broker”。

          转自:http://www.aboutyun.com/thread-11115-1-1.html

  • 相关阅读:
    Poj 2017 Speed Limit(水题)
    Poj 1316 Self Numbers(水题)
    Poj 1017 Packets(贪心策略)
    Poj 1017 Packets(贪心策略)
    Poj 2662,2909 Goldbach's Conjecture (素数判定)
    Poj 2662,2909 Goldbach's Conjecture (素数判定)
    poj 2388 Who's in the Middle(快速排序求中位数)
    poj 2388 Who's in the Middle(快速排序求中位数)
    poj 2000 Gold Coins(水题)
    poj 2000 Gold Coins(水题)
  • 原文地址:https://www.cnblogs.com/sh425/p/7158822.html
Copyright © 2011-2022 走看看