zoukankan      html  css  js  c++  java
  • Kafaka 总结

    Kafka是一个分布式的Streaming处理平台,Kafka可以用于数据库中数据的导入导出,也可以用于实时流的处理,但是Kafka最核心的功能就是作为分布式的消息中间件。
    Kafka集群是由多个Broker Server组成的,消息的发送者称为Producer,消息的消费者称为Consumer,topic则是Kafka消息的发送、存储和消费中最核心的抽象,每一个Producer都需要指定将消息发往哪个topic,而Consumer则需要指定消费哪一个topic的数据,所以topic是连接Producer和Consumer的桥梁。
    topic可以分成多个分区,这些分区都是分布式的均匀的分布在多个Broker Server上,每一个topic的每一个Partition都可以配置备份冗余存储在多个Broker Server上,这样可以提高数据的高可用性。每一个topic的数据都是按照每一个分区存储在Kafka Broker Server指定的存储文件中的,这个存储的时间默认是7天,过了7天这些数据将会被删除掉,这个7天当然可以配置。
    Producer发送消息的时候只需要指定topic即可,那么一个topic可能有多个partition,那么Producer发送的一条数据到底发送到这个topic的哪一个partition中呢,这个就是Producer在发送消息时需要使用Partitioner来为发送的数据进行分区了,按照一定的规则来计算出将要发送的数据需要发往哪个分区,这个Partitioner默认是按照轮询的规则进行分区,当然可以自定义这个规则
    Consumer消息消息的时候除了需要指定topic外,还需要指定这个Consumer属于哪一个Consumer Group。每一个Consumer Group消费topic所有的partition的数据,而属于一个Consumer Group的所有的Consumer平均消费同一个topic的所有partition的数据,每一个Consumer消费topic中的partitions数据的时候都是按照offset来消费的,这个offset就是消息在Kafka中topic的位置
     

     

     

     Kafka基本术语 - Consumer

    一个topic的数据可以被多个Consumer消费:
    1、Consumer是根据offset来消费topic中的Record的
    2、offset是Consumer控制的,所以Consumer可以按照不同需求消费任何位置的数据,在数据存在的7天内
     
     
    Consumer Group
    每一个Consumer都被归为一个Consumer Group
    一个Consumer Group可以包含一个或者多个Consumer
    一个topic中的一条Record会被所有订阅了这个topic的Consumer Group消费

     

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    /**
     * Created by tangweiqun on 2017/12/23.
     */
    public class SimpleComsumerGroup1 {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "master:9092"); 
            props.put("group.id", "group1");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
            consumer.subscribe(Arrays.asList("test-group"));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s, topic = %s, partition = %d",
                            record.offset(), record.key(), record.value(), record.topic(), record.partition());
                    System.out.println();
                }
            }
        }
    }
    

      

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    /**
     * Created by tangweiqun on 2017/12/23.
     */
    public class SimpleComsumerGroup2 {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "master:9092");
            props.put("group.id", "group2");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
            consumer.subscribe(Arrays.asList("test-group"));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s, topic = %s, partition = %d",
                            record.offset(), record.key(), record.value(), record.topic(), record.partition());
                    System.out.println();
                }
            }
        }
    }
    

      

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    
    public class SimpleProducer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "master:9092");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("batch.size", "10");    
    
            Producer<String, String> producer = new KafkaProducer<>(props);
            for (int i = 0; i < 100; i++) {
                producer.send(new ProducerRecord<String, String>("test-group",
                        Integer.toString(i), Integer.toString(i)));
            }
    
            producer.close();
        }
    }
    

      

  • 相关阅读:
    算法导论:堆排序
    地域划分
    字符串翻转
    lintcode:买卖股票的最佳时机 IV
    lintcode:买卖股票的最佳时机 III
    lintcode:买卖股票的最佳时机 II
    lintcode:买卖股票的最佳时机 I
    2016腾讯编程题:微信红包
    2016腾讯编程题:生成格雷码
    2016京东编程题:小东分苹果
  • 原文地址:https://www.cnblogs.com/tesla-turing/p/11488638.html
Copyright © 2011-2022 走看看