zoukankan      html  css  js  c++  java
  • kafka生产者与消费者

    package kafka;
    
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.log4j.Logger;
    
    import java.util.Properties;
    
    
    public class Producer {
    
        Logger logger = Logger.getLogger("Producer");
    
        public KafkaProducer getKafkaProducer() {
            Properties kafkaProps = new Properties();
            /**
             * kafka生产者必选是三个属性
             * bootstrap.servers 指定broker的地址清单
             * key.serializer 必须是一个实现org.apache.kafka.common.serialization.Serializer接口的类,将key序列化成字节数组。注意:key.serializer必须被设置,即使消息中没有指定key
             * value.serializer  将value序列化成字节数组
             */
    
            kafkaProps.put("bootstrap.servers", "localhost:9092");
            kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            //其他设置
            /*
            acks=0:如果设置为0,生产者不会等待kafka的响应,高吞吐。消息会被立刻加到发送缓冲通道中,并且认为已经发送成功。这种情况下,不能保证kafka接收到了这条消息,retries配置不会生效,每条消息的偏移量都是1;
    
            acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。这种情况下,在写入日志成功后,集群主机器挂掉,同时从机器还没来得及写的话,消息就会丢失掉。
    
            acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证,最安全模式,但延迟相对较长。
    
            (1)acks指定必须要有多少个partition副本收到消息,生产者才会认为消息的写入是成功的。
    
                  acks=0,生产者不需要等待服务器的响应,以网络能支持的最大速度发送消息,吞吐量高,但是如果broker没有收到消息,生产者是不知道的
    
                  acks=1,leader partition收到消息,生产者就会收到一个来自服务器的成功响应
    
                  acks=all,所有的partition都收到消息,生产者才会收到一个服务器的成功响应
    
            (2)buffer.memory,设置生产者内缓存区域的大小,生产者用它缓冲要发送到服务器的消息。
    
            (3)compression.type,默认情况下,消息发送时不会被压缩,该参数可以设置成snappy、gzip或lz4对发送给broker的消息进行压缩
    
            (4)retries,生产者从服务器收到临时性错误时,生产者重发消息的次数
    
            (5)batch.size,发送到同一个partition的消息会被先存储在batch中,该参数指定一个batch可以使用的内存大小,单位是byte。不一定需要等到batch被填满才能发送
    
            (6)linger.ms,生产者在发送消息前等待linger.ms,从而等待更多的消息加入到batch中。如果batch被填满或者linger.ms达到上限,就把batch中的消息发送出去
    
            (7)max.in.flight.requests.per.connection,生产者在收到服务器响应之前可以发送的消息个数
             */
            kafkaProps.put("acks", "all");//
            return new KafkaProducer(kafkaProps);
        }
    
        /**
         * 同步发送
         *
         * @param topic
         * @param key
         * @param value
         * @param kafkaProducer
         */
        public void sendMsgSynchr(String topic, String key, String value, KafkaProducer kafkaProducer) {
    
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, key, value);
            kafkaProducer.send(producerRecord);
        }
    
        /**
         * 异步发送
         *
         * @param topic
         * @param key
         * @param value
         * @param kafkaProducer
         */
        public void sendMsgAsynchr(String topic, String key, String value, KafkaProducer kafkaProducer) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, key, value);
            kafkaProducer.send(producerRecord, new ProducerCallback());//发送消息时,传递一个回调对象,该回调对象必须实现org.apahce.kafka.clients.producer.Callback接口
        }
    
        private class ProducerCallback implements Callback {
    
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (e != null) {//如果Kafka返回一个错误,onCompletion方法抛出一个non null异常。
                    e.printStackTrace();//对异常进行一些处理,这里只是简单打印出来
                }
            }
        }
    
        public static void main(String[] args) {
            Producer producer = new Producer();
            KafkaProducer kafkaProducer = producer.getKafkaProducer();
            for (int i = 0; i < 100; i++) {
                String msg = "msg------" + i;
                System.out.println(msg);
                producer.sendMsgAsynchr("test_kafka", null, msg, kafkaProducer);
            }
        }
    }
    package kafka;
    
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.Collections;
    import java.util.Properties;
    
    public class Consumer {
    
    
        public KafkaConsumer getKafkaConsumer() {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("group.id", "groupid1");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            //其他参数
            /*
            1:fetch.min.bytes,指定消费者从broker获取消息的最小字节数,即等到有足够的数据时才把它返回给消费者
    
            2:fetch.max.wait.ms,等待broker返回数据的最大时间,默认是500ms。fetch.min.bytes和fetch.max.wait.ms哪个条件先得到满足,就按照哪种方式返回数据
    
            3:max.partition.fetch.bytes,指定broker从每个partition中返回给消费者的最大字节数,默认1MB
    
            4:session.timeout.ms,指定消费者被认定死亡之前可以与服务器断开连接的时间,默认是3s
    
            5:auto.offset.reset,消费者在读取一个没有偏移量或者偏移量无效的情况下(因为消费者长时间失效,包含偏移量的记录已经过时并被删除)该作何处理。默认是latest(消费者从最新的记录开始读取数据)。另一个值是                    earliest(消费者从起始位置读取partition的记录)
    
            6:enable.auto.commit,指定消费者是否自动提交偏移量,默认为true
    
            7:partition.assignment.strategy,指定partition如何分配给消费者,默认是Range。Range:把Topic的若干个连续的partition分配给消费者。RoundRobin:把Topic的所有partition逐个分配给消费者
    
            8:max.poll.records,单次调用poll方法能够返回的消息数量
             */
            return new KafkaConsumer(props);
        }
    
        public void getMsg(String topic, KafkaConsumer kafkaConsumer) {
            //2.订阅Topic
    
            //创建一个只包含单个元素的列表,Topic的名字叫作customerCountries
            kafkaConsumer.subscribe(Collections.singletonList(topic));  //主题列表
            //支持正则表达式,订阅所有与test相关的Topic
            //consumer.subscribe("test.*");
            //3.轮询
            //消息轮询是消费者的核心API,通过一个简单的轮询向服务器请求数据,一旦消费者订阅了Topic,轮询就会处理所欲的细节,包括群组协调、partition再均衡、发送心跳
            //以及获取数据,开发者只要处理从partition返回的数据即可。
    
    //        try {
            while (true) {//消费者是一个长期运行的程序,通过持续轮询向Kafka请求数据。在其他线程中调用consumer.wakeup()可以退出循环
                //在100ms内等待Kafka的broker返回数据.超市参数指定poll在多久之后可以返回,不管有没有可用的数据都要返回
    
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100l);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record.value());
                }
            }
    //        } finally {
    //            //退出应用程序前使用close方法关闭消费者,网络连接和socket也会随之关闭,并立即触发一次再均衡
    //            kafkaConsumer.close();
    //        }
        }
    
        public static void main(String[] args) {
            Consumer consumer = new Consumer();
            KafkaConsumer kafkaConsumer = consumer.getKafkaConsumer();
            consumer.getMsg("test_kafka", kafkaConsumer);
        }
    }
  • 相关阅读:
    python mymsql sqlalchemy
    python中 wraps 的作用
    python Subprocess的使用
    实现一个命令分发器
    实现一个cache装饰器,实现过期可清除功能
    求2个字符串的最长公共子串
    Base64编码,解码的实现
    把一个字典扁平化
    hihocoder1415 重复旋律3
    hihocoder 1407 重复旋律2
  • 原文地址:https://www.cnblogs.com/yin-fei/p/11171336.html
Copyright © 2011-2022 走看看