zoukankan      html  css  js  c++  java
  • Kafka实战系列--Kafka API使用体验

    前言:
      kafka是linkedin开源的消息队列, 淘宝的metaq就是基于kafka而研发. 而消息队列作为一个分布式组件, 在服务解耦/异步化, 扮演非常重要的角色. 本系列主要研究kafka的思想和使用, 本文主要讲解kafka的一些基本概念和api的使用.

    *) 准备工作
    1) 配置maven依赖

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.9.2</artifactId>
      <version>0.8.1.1</version>
    </dependency>

    2).配置hosts
    vim /etc/hosts
    把kafka集群相关的ip及其hostname, 配置到kafka客户端的本地机器

    *) Kafka的基础知识
    1). Broker, Zookeeper, Producer, Consumer
    Broker具体承担消息存储转发工作, Zookeeper则用与元信息的存储(topic的定义/消费进度), Producer则是消息的生产者, Consumer则是消息的消费者.

    2). Topic, Partition, Replication, Consumer Group
      Topic对应一个具体的队列, 在Kafka的概念中, 一个应用一个队列. 应用数据往往呈现部分有序的特点, 因此对kafka的队列, 引入partition的概念, 即可topic划分为多个partition. 单个Partition内保证有序, Partition间不保证. 这样作的好处, 是充分利用了集群的能力, 均匀负载和提高性能.
      Replication主要为了高可用性, 保证部分节点失效的恶劣情况下, 队列数据能不丢.
      Consumer Group的概念的引入, 很有创新性, 把以往传统队列(topic模式, queue模式)的属性从队列本身挪到了消费端. 若要使用queue模式, 则所有的消费端都采用统一个consumer group, 若采用topic模式, 则所有的客户端都设置为不同的consumer group. 其partition的消费进度在zookeeper有所保存.

     *) Kafka API的简单样列代码

    1). 生产者代码
    分区类代码片段

    public class SimplePartitioner implements Partitioner {
      public SimplePartitioner (VerifiableProperties props) {
      }
      public int partition(Object key, int numPartitions) {
        return (key.hashCode() & 0x0FFFFFFF) % numPartitions;
      }
    }

    评注: SimplePartitioner用于对消息进行分发到具体的partition中, 有消息的key来决定, 这个有点像map/reduce中的partition机制.

    生产者代码片段

    Properties props = new Properties();
    // 配置metadata.broker.list, 为了高可用, 最好配两个broker实例
    props.put("metadata.broker.list", "127.0.0.1:9092");
    // serializer.class为消息的序列化类
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    // 设置Partition类, 对队列进行合理的划分
    props.put("partitioner.class", "mmxf.kafka.practise.SimplePartitioner");
    // ACK机制, 消息发送需要kafka服务端确认
    props.put("request.required.acks", "1");
    
    ProducerConfig config = new ProducerConfig(props);
    Producer<String, String> producer = new Producer<String, String>(config);
    
    // KeyedMessage<K, V>
    //   K对应Partition Key的类型
    //   V对应消息本身的类型
    //   topic: "test", key: "key", message: "message" KeyedMessage<String, String> message = new KeyedMessage<String, String>("test", "key", "message"); producer.send(message); // 关闭producer实例 producer.close();

    2). 消费者代码
    使用High Level Consumer的API 线程模型和Partition数最好能保持一致, 即One Thread For Partition
    参考sample样例: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
    代码片段如下:

    public static void main(String[] args) {
    
      // *) 创建ConsumerConfig
      Properties props = new Properties();
      // 设置zookeeper的链接地址
      props.put("zookeeper.connect", "127.0.0.1:2181");
      // 设置group id
      props.put("group.id", "group_id");
      // kafka的group 消费记录是保存在zookeeper上的, 但这个信息在zookeeper上不是实时更新的, 需要有个间隔时间更新
      props.put("auto.commit.interval.ms", "1000");
    
      ConsumerConfig consumerConfig = new ConsumerConfig(props);
      ConsumerConnector consumer = (ConsumerConnector) Consumer.createJavaConsumerConnector(consumerConfig);
    
      String topic = "test";
      int threadNum = 1;
    
      // *) 设置Topic=>Thread Num映射关系, 构建具体的流
      Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
      topicCountMap.put(topic,threadNum);
      Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    
      List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
    
      // *) 启动线程池去消费对应的消息
      ExecutorService executor = Executors.newCachedThreadPool();
      for ( final KafkaStream<byte[], byte[]> stream : streams ) {
        executor.submit(new Runnable() {
          public void run() {
            ConsumerIterator<byte[], byte[]> iter = stream.iterator();
            while ( iter.hasNext() ) {
              MessageAndMetadata<byte[] , byte[]> mam = iter.next();
              System.out.println(
                String.format("thread_id: %d, key: %s, value: %s",
                    Thread.currentThread().getId(),
                    new String(mam.key()),
                    new String(mam.message())
                  )
              );  
            }
          }
        });
      }
    
      try {
        Thread.sleep(1000 * 10);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    
      // *) 优雅地退出
      consumer.shutdown();
      executor.shutdown();
    
      while ( !executor.isTerminated() ) {
        try {
          executor.awaitTermination(1, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
        }
      }
    
    }

     结果输出:

    thread_id: 18, key: key, value: message
  • 相关阅读:
    如何在Centos官网下载所需版本的Centos
    Zabbix微信告警
    CentOS 7.4 源码编译安装 Redis
    zabbix源码安装后,设置为服务启动和关闭
    MySQL 快速入门教程
    mysql解决 ERROR 1045 (28000): Access denied for user 'root'@'localhost' (using password: YES)的报错
    转:Centos7安装zabbix3.4超详细步骤解析
    centos7使用Gogs搭建Git服务器
    Centos7 使用firewall管理防火墙
    zabbix登录密码重置方法
  • 原文地址:https://www.cnblogs.com/mumuxinfei/p/3859193.html
Copyright © 2011-2022 走看看