您可以参考前一篇文章 (一)Kafka0.8.2官方文档中文版系列-入门指南
2. API
我们正在为Kafka重写JVM客户端。在Kafka0.8.2中,包含一个新重写的Java producer。下一个版本将包含一个等效的Java consumer。这些新客户端旨在取代现有的Scala客户端,但为了兼容性,它们将共存一段时间。这些客户端可以在一个独立的jar中使用,并且具有最小的依赖性,而旧的Scala客户端仍然与服务器打包在一起。
2.1 Producer API
在kafka0.8.2版本中,我们鼓励你使用新的java producer。这个客户端经过生产环境的测试,相比之前的scala客户端该客户端更快、有更多的特性。你可以通过添加如下maven依赖使用它:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.8.2.0</version> </dependency>
在javadoc中可以查看如何使用producer。
对于那些对遗留Scala生产者api感兴趣的人,可以在这里找到相关信息。
2.2 High Level Consumer API
class Consumer { /** * Create a ConsumerConnector 创建一个消费者连接器 * * @param config at the minimum, need to specify the groupid of the consumer and the zookeeper * connection string zookeeper.connect.
* 参数解释:基于一个最小的配置,你只需要指定消费者组,zookeeper的连接 */ public static kafka.javaapi.consumer.ConsumerConnector createJavaConsumerConnector(ConsumerConfig config); } /** * V: type of the message 消息的类型 * K: type of the optional key assciated with the message 与消息相关的可选的配置 */ public interface kafka.javaapi.consumer.ConsumerConnector { /** * Create a list of message streams of type T for each topic. * 为每个topic创建一个T类型的消息流列表
* * @param topicCountMap a map of (topic, #streams) pair * @param decoder a decoder that converts from Message to T * @return a map of (topic, list of KafkaStream) pairs. * The number of items in the list is #streams. Each stream supports * an iterator over message/metadata pairs. */ public <K,V> Map<String, List<KafkaStream<K,V>>> createMessageStreams(Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder); /** * Create a list of message streams of type T for each topic, using the default decoder.
* 为每个topic创建一个T类型的消息流列表,使用默认的解码器 */ public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap); /** * Create a list of message streams for topics matching a wildcard. * * @param topicFilter a TopicFilter that specifies which topics to * subscribe to (encapsulates a whitelist or a blacklist). * @param numStreams the number of message streams to return. * @param keyDecoder a decoder that decodes the message key * @param valueDecoder a decoder that decodes the message itself * @return a list of KafkaStream. Each stream supports an * iterator over its MessageAndMetadata elements. */ public <K,V> List<KafkaStream<K,V>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder<K> keyDecoder, Decoder<V> valueDecoder); /** * Create a list of message streams for topics matching a wildcard, using the default decoder.
* 为与通配符匹配的消息流创建一个消息流列表,使用默认的解码器 */ public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams); /** * Create a list of message streams for topics matching a wildcard, using the default decoder, with one stream.
* 为与通配符匹配的消息流创建一个消息流列表,使用默认的解码器 */ public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter); /** * Commit the offsets of all topic/partitions connected by this connector.
* 提交通过该连接器关联的所有的topic和分区的偏移量 */ public void commitOffsets(); /** * Shut down the connector 关闭连接器 */ public void shutdown(); }
你可以参考这个示例去学习如何使用high level 消费者API。
2.3 Simple Consumer API
class kafka.javaapi.consumer.SimpleConsumer { /** * Fetch a set of messages from a topic.
* 从一个topic拉取消息 * * @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
* 请求需要指定主题的名称、主题分区、起始偏移量、拉取数据的最大字节数 * @return a set of fetched messages
* 拉取回来的消息集合 */ public FetchResponse fetch(kafka.javaapi.FetchRequest request); /** * Fetch metadata for a sequence of topics.
* 获取一系列主题的元数据 * * @param request specifies the versionId, clientId, sequence of topics. 需要指定版本号、客户端ID、主题 * @return metadata for each topic in the request. 每个请求主题的元数据 */ public kafka.javaapi.TopicMetadataResponse send(kafka.javaapi.TopicMetadataRequest request); /** * Get a list of valid offsets (up to maxSize) before the given time.
* 在给定的时间之前,得到一个有效的偏移量列表(偏移量可以取到给定时间之前的最大值) * * @param request a [[kafka.javaapi.OffsetRequest]] object. * @return a [[kafka.javaapi.OffsetResponse]] object. */ public kafka.javaapi.OffsetResponse getOffsetsBefore(OffsetRequest request); /** * Close the SimpleConsumer. 关闭SimpleConsumer */ public void close(); }
对大多数应用来说,the high level API是完全够用的。一些应用想要使用一些high level API没有暴露的特性(比如说,当重启消费者时指定初始的offset,即偏移量)。他们可以使用我们的low level
SimpleConsumer API。但是这个逻辑会有点复杂,你可以参考这个例子。
2.4 Kafka Hadoop Consumer API
我们的一个基本用例就是,为数据聚合和加载数据到hadoop提供一个水平扩展的解决方案。为了支持这个用户用例,我们提供了一个基于hadoop的消费者,它生成了许多map任务,以并行地从Kafka集群中拉取数据。这可以非常快速的将kafka的数据加载到hadoop中(我们只用了一些Kafka服务器就完全饱和了网络,意思就是基于hadoop的consumer拉取速度很快)。
使用hadoop consumer的信息,可以在这里找到。