zoukankan      html  css  js  c++  java
  • kafka-Producer API

    1.消息发送流程

      kafka的producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到kafka broker。

        

    2.异步发送API

      2.1 导入依赖

    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.11.0.0</version>
    </dependency>

      2.2 普通生产者

        不带回调函数,其中像端口号等配置项都封装在了ProducerConfig这个类中,也可以使用

    ProducerConfig.BOOTSTRAP_SERVERS_CONFIG 这种方式去设置属性

        2.2.1 编写代码

    package com.wn.Test01;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    
    public class MyProducer {
        public static void main(String[] args){
            //创建kafka生产者的配置信息
            Properties properties = new Properties();
            //kafka集群
            properties.put("bootstrap.servers","192.168.138.55:9092,192.168.138.66:9092,192.168.138.77:9092");
            //ack应答级别
            properties.put("acks","all");
            //重试次数
            properties.put("retries",3);
            //批次大小 16K
            properties.put("batch.size",16384);
            //等待时间
            properties.put("linger.ms",1);
            //RecordAccumulator缓冲区大小 32M
            properties.put("buffer.memory",33554432);
            //key,value序列化类
            properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
            //创建生产者对象
            KafkaProducer<String,String> producer = new KafkaProducer<>(properties);
            //发送数据
            for (int i=0;i<10;i++){
                producer.send(new ProducerRecord<String, String>("wang","wnwn--"+i));
            }
            //关闭资源
            producer.close();
    
        }
    }

        2.2.2 启动zookeeper和kafka

        2.2.3 创建消费者

    bin/kafka-console-consumer.sh --zookeeper 192.168.138.55:2181,192.168.138.62181,192.168.138.77:2181 --topic wang

        2.2.4 执行方法,查看接收数据

        

      2.3 带回调函数的生产者

        在生产者send时,设置Callback对象,并重写里面的onCompletion方法,回调函数

        2.3.1 编写测试代码

    package com.wn.Test01;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Properties;
    
    /*带回调函数的生产者*/
    //在生产者send时,设置Callback对象,并重写里面的onCompletion方法,回调函数
    public class CallBackProducer {
        public static void main(String[] args){
            //创建配置信息
            Properties properties = new Properties();
            //kafka服务端的主机名和端口号
            properties.put("bootstrap.servers","192.168.138.55:9092,192.168.138.66:9092,192.168.138.77:9092");
            //ack应答级别
            properties.put("acks","all");
            //重试次数
            properties.put("retries",3);
            //一批消息处理大小
            properties.put("batch.size",16384);
            //请求延迟
            properties.put("linger.ms",1);
            //发送缓存区内存大小
            properties.put("buffer.memory",33554432);
            //key,value序列化类
            properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
            //创建生产者对象
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
            for (int i=0;i<50;i++){
                producer.send(new ProducerRecord<String, String>("wang", Integer.toString(i), "hello word-" + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if (recordMetadata!=null){
                            System.out.println(recordMetadata.partition()+"---"+recordMetadata.offset());
                        }
                    }
                });
            }
            //关闭资源
            producer.close();
        }
    }

        2.3.2 启动zookeeper和kafka

        2.3.3 创建消费者

    bin/kafka-console-consumer.sh --zookeeper 192.168.138.55:2181,192.168.138.66:2181,192.168.138.77:2181 --topic wang

        2.3.4 执行方法,查看结果

        

      2.4 自定义分区的生产者

        2.4.1 创建自定义分区

    package com.wn.Test01;
    
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    
    import java.util.Map;
    
    /*自定义分区*/
    public class MyPartitioner implements Partitioner {
    
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes1, Cluster cluster) {
            //Integer integer = cluster.partitionCountForTopic(topic);
            //return key.toString().hashCode()%integer;
            return 1;
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void configure(Map<String, ?> map) {
    
        }
    }

        2.4.2 创建自定义分区的生产者

    package com.wn.Test01;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Properties;
    
    /*自定义分区的生产者*/
    public class PartitionProducer {
        public static void main(String[] args){
            //创建kafka生产者的配置信息
            Properties properties = new Properties();
            //kafka集群   ProducerConfig
            properties.put("bootstrap.servers","192.168.138.55:9092,192.168.138.66:9092,192.168.138.77:9092");
            //ack应答级别
            properties.put("acks","all");
            //重试次数
            properties.put("retries",3);
            //批次大小 16K
            properties.put("batch.size",16384);
            //等待时间
            properties.put("linger.ms",1);
            //RecordAccumulator缓冲区大小 32M
            properties.put("buffer.memory",33554432);
            //key,value序列化类
            properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
            //添加分区器
            properties.put("partitioner.class","com.wn.Test01.MyPartitioner");
            //创建生产者对象
            KafkaProducer<String,String> producer = new KafkaProducer<>(properties);
            //发送数据
            for (int i=0;i<10;i++){
                producer.send(new ProducerRecord<String, String>("aaa", Integer.toString(i), "word-" + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if (recordMetadata!=null){
                            System.out.println(recordMetadata.partition()+"---"+recordMetadata.offset());
                        }
                    }
                });
            }
            //关闭资源
            producer.close();
    
        }
    }

        2.4.3 启动zookeeper+kafka

        2.4.4 创建消费者

    bin/kafka-console-consumer.sh --zookeeper 192.168.138.55:2181,192.168.138.66:2181,192.168.138.77:2181 --topic aaa

        2.4.5 执行方法,查看结果

          

          

           将所有的消息都在1号分区;

    4.同步发送API

      同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack;

      由于send方法返回的是一个Future对象,根据Future对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方法即可;

    package com.wn.Test01;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    /*同步发送生产者*/
    public class TongProducer {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            //创建kafka生产者的配置信息
            Properties properties = new Properties();
            //kafka集群   ProducerConfig
            properties.put("bootstrap.servers","192.168.138.55:9092,192.168.138.66:9092,192.168.138.77:9092");
            //ack应答级别
            properties.put("acks","all");
            //重试次数
            properties.put("retries",3);
            //批次大小 16K
            properties.put("batch.size",16384);
            //等待时间
            properties.put("linger.ms",1);
            //RecordAccumulator缓冲区大小 32M
            properties.put("buffer.memory",33554432);
            //key,value序列化类
            properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
            //创建生产者对象
            KafkaProducer<String,String> producer = new KafkaProducer<>(properties);
            //发送数据
            for (int i=0;i<5;i++){
                producer.send(new ProducerRecord<String, String>("aaa","wnwn--"+i)).get();
            }
            //关闭资源
            producer.close();
    
        }
    }

     

  • 相关阅读:
    音乐
    脚本注释的作用
    JavaScript状态栏冒泡
    JavaScript动态显示时间
    正则表达式
    JavaScript数组的最大长度
    加密(编码)web.config中的信息(如:连接字符串)
    JavaScript实现EMAIL功能
    JavaScript检测分辨率
    JavaScript如何给网页滚动条加上颜色?
  • 原文地址:https://www.cnblogs.com/wnwn/p/12422485.html
Copyright © 2011-2022 走看看