zoukankan      html  css  js  c++  java
  • Kafka-API

    ctrl+H
    new 它的实现类
    ctrl+r替换
    格式化ctrl+alt+l

    ctrl+f
    ctrl+alt+v


    替换
    &lt "
    &lt <
    &gt >

    Kafka生产者Java API

     创建生产者

    不带回调函数的

    public class CustomProducer {
        public static void main(String[] args) throws InterruptedException {
            Properties properties = new Properties();
            //kafka地址
            properties.put("bootstrap.servers", "hadoop101:9092, hadoop102:9092, hadoop103:9092");
            //acks=-1
            properties.put("acks", "all");
            properties.put("retries", 0);
            //基于大小的批处理
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
            //基于时间的批处理
            properties.put("linger.ms", 1);
            //客户端缓存大小
            properties.put("buffer.memory", 33554432);
            //k v序列化
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            Producer<String, String> producer = new KafkaProducer<String, String>(properties);
    
    
            for (int i = 0; i < 9; i++){
                producer.send(new ProducerRecord<String, String>("first","" + i, "Hello" + i ));
            }
            //Thread.sleep(1000);
            producer.close(); //忘记close关了,它就是基于批处理的条件( 基于大小的批处理; 基于时间的批处理,看是否达到,没有达到就不会send;)
    
        }
    
    }

     new producer<String, String>( "主题", 分区int, " key“, "value" )

    带回调函数

    带回调函数的producer, 每发一条消息调用一次回调函数
    不管有没有发送成功

    public class CustomProducerCompletion {
        public static void main(String[] args) {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "hadoop101:9092, hadoop102:9092, hadoop103:9092");
            properties.put("acks", "all");
            properties.put("retries", 2);
            properties.put("batch.size", 16384);
            properties.put("linger.ms", 1);
            properties.put("buffer.memory", 33554432);
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            //自定义分区 ProducerConfig.PARTITIONER_CLASS_CONFIG
            //properties.put("partitioner.class", "com.atguigu.kafka.producer.CustomPartitioner");
            //拦截器
            properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                    Arrays.asList("com.atguigu.kafka.interceptor.TimeStampInterceptor","com.atguigu.kafka.interceptor.CountInterceptor"));
    
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
            for (int i = 0; i < 9; i++){
                kafkaProducer.send(new ProducerRecord<String, String>("first", "1", "Hi" + i), new Callback() {
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if (recordMetadata != null){
                            System.out.println("Topic:" + recordMetadata.topic() + "	" +
                                    "Partition:" + recordMetadata.partition() + "	" + "offset:" + recordMetadata.offset()
                                    );
                        }
                    }
                });
            }
            kafkaProducer.close();
    
        }
    }

    自定义分区

     指定分区重写key的规则

    public class CustomPartitioner implements Partitioner {
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            return 0; //控制分区
        }
    
        public void close() {
    
        }
    
        /**
         * 可以添加属性
         * @param config
         */
        public void configure(Map<String, ?> config) {
    
        }
    }

    Kafka消费者Java API

    高级API

    不需手动管理offset

    poll 超时时间
    subscribe订阅主题
    可同时消费多个主题
    数组-Arrays.asList->集合

    1) 高级API优点

    高级API 写起来简单

    不需要自行去管理offset,系统通过zookeeper自行管理。

    不需要管理分区,副本等情况,系统自动管理。

    消费者断线会自动根据上一次记录在zookeeper中的offset去接着获取数据;可以使用group来区分对同一个topic 的不同程序访问分离开来(不同的group记录不同的offset,这样不同程序读取同一个topic才不会因为offset互相影响)

    2)高级API缺点

    不能自行控制offset(对于某些特殊需求来说)

    不能细化控制如分区、副本、zk等

    //高级API
    public class CustomConsumer  {
        public static void main(String[] args) {
            Properties properties = new Properties();
            //定义kafka集群地址
            properties.put("bootstrap.servers", "hadoop101:9092, hadoop102:9092, hadoop103:9092");
            //消费者组id
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "kris");
            //是否自动提交偏移量:(kafka集群管理)
            properties.put("enable.auto.commit", "true");
            //间隔多长时间提交一次offset
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
            //key,value的反序列化
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
            kafkaConsumer.subscribe(Arrays.asList("first"));  //订阅主题
            while (true){
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100); //定义Consumer, poll拉数据
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("Topic:" + record.topic() + "	" +
                            "Partition:" + record.partition() + "	" + "Offset:" +record.offset()
                            + "	" + "key:" + record.key() + "	" + "value:" + record.value());
                }
            }
        }
    }

    低级API

    leader
      offset
      保存offset
    消息

    public class LowLevelConsumer {
        public static void main(String[] args) {
            //1.集群
            ArrayList<String> list = new ArrayList<>();
            list.add("hadoop101");
            list.add("hadoop102");
            list.add("hadoop103");
            //2.主题
            String topic = "first";
            //3.分区
            int partition = 2;
            //4.offset
            long offset = 10;
    
            //5.获取leader
            String leader = getLeader(list, topic, partition);
            //6.连接leader获取数据
            getData(leader, topic, partition, offset);
    
        }
    
        private static void getData(String leader, String topic, int partition, long offset) {
            //1.创建SimpleConsumer
            SimpleConsumer consumer = new SimpleConsumer(leader, 9092, 2000, 1024 * 1024 * 2, "getData");
            //2.发送请求
            //3.构建请求对象FetchRequestBuilder
            FetchRequestBuilder builder = new FetchRequestBuilder();
            FetchRequestBuilder requestBuilder = builder.addFetch(topic, partition, offset, 1024 * 1024);
            FetchRequest fetchRequest = requestBuilder.build();
            //4.获取响应
            FetchResponse fetchResponse = consumer.fetch(fetchRequest);
            //5.解析响应
            ByteBufferMessageSet messageAndOffsets = fetchResponse.messageSet(topic, partition);
            //6.遍历
            for (MessageAndOffset messageAndOffset : messageAndOffsets) {
                long message_offset = messageAndOffset.offset();
                Message message = messageAndOffset.message();
                //7.解析message
                ByteBuffer byteBuffer = message.payload(); //payload是有效负载
                byte[] bytes = new byte[byteBuffer.limit()];
                byteBuffer.get(bytes);
                //8.获取数据
                System.out.println("offset:" + message_offset + "	" + "value:" + new String(bytes));
    
            }
    
        }
    
        private static String getLeader(ArrayList<String> list, String topic, int partition) {
            //1.循环发送请求,获取leader
            for (String host : list) {
                //2.创建SimpleConsumer对象
                SimpleConsumer consumer = new SimpleConsumer(
                        host,
                        9092,
                        2000,
                        1024*1024,
                        "getLeader"
                );
                //3.发送获取leader请求
                //4.构造请求TopicMetadataRequest
                TopicMetadataRequest request = new TopicMetadataRequest(Arrays.asList(topic));
                //5.获取响应TopicMetadataResponse
                TopicMetadataResponse response = consumer.send(request);
                //6.解析响应
                List<TopicMetadata> topicsMetadata = response.topicsMetadata();
                //7.遍历topicsMetadata
                for (TopicMetadata topicMetadata : topicsMetadata) {
                    List<PartitionMetadata> partitionsMetadata = topicMetadata.partitionsMetadata();
                    //8.遍历partitionsMetadata
                    for (PartitionMetadata partitionMetadata : partitionsMetadata) {
                        //9.判断
                        if (partitionMetadata.partitionId() == partition){
                            BrokerEndPoint endPoint = partitionMetadata.leader();
                            return endPoint.host();
    
                        }
                    }
    
                }
    
            }
    
            return null;
        }
    }

    Kafka producer拦截器

    flume-事件
    flume的拦截器链:
    kafka-消息

    每发送一条数据调用一次onSend方法
    接收数据调用回调函数之前调用onAcknoeledgement

    https://blog.csdn.net/stark_summer/article/details/50144591

    Kafka与Flume比较

    在企业中必须要清楚流式数据采集框架flume和kafka的定位是什么:

    flume:cloudera公司研发:

           适合多个生产者;

    适合下游数据消费者不多的情况;

    适合数据安全性要求不高的操作;

    适合与Hadoop生态圈对接的操作。

    kafka:linkedin公司研发:

    适合数据下游消费众多的情况;

    适合数据安全性要求较高的操作,支持replication。

    因此我们常用的一种模型是:

    线上数据 --> flume --> kafka --> flume(根据情景增删该流程) --> HDFS

    vim flume-kafka.conf 
    # define
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F -c +0 /opt/module/datas/flume.log
    a1.sources.r1.shell = /bin/bash -c
    
    # sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
    a1.sinks.k1.kafka.topic = first
    a1.sinks.k1.kafka.flumeBatchSize = 20
    a1.sinks.k1.kafka.producer.acks = 1
    a1.sinks.k1.kafka.producer.linger.ms = 1
    
    # channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # bind
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

     tail -F动态实时  -c 0从0行开始监控

    [kris@hadoop101 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/flume-kafka.conf 
    
    [kris@hadoop101 datas]$ cat > flume.log 
    Hello
    
    [kris@hadoop101 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic first
    Hello
  • 相关阅读:
    10. Regular Expression Matching
    9. Palindrome Number
    6. ZigZag Conversion
    5. Longest Palindromic Substring
    4. Median of Two Sorted Arrays
    3. Longest Substring Without Repeating Characters
    2. Add Two Numbers
    链式表的按序号查找
    可持久化线段树——区间更新hdu4348
    主席树——树链上第k大spoj COT
  • 原文地址:https://www.cnblogs.com/shengyang17/p/10459459.html
Copyright © 2011-2022 走看看