zoukankan      html  css  js  c++  java
  • kafka2.5.0生产者与消费者,java普通main方法简单示例,不包含ack机制

    重要知识:

    kafka生产者是线程安全的 ,不管启动多少个线程去执行生产者,都是线程安全的。

    1)kafka生产者,有3种发送方式:1、发送并忘记;2、同步发送;3、异步发送

    生产者。发送方式:1、发送并忘记;

    import cn.enjoyedu.config.BusiConst;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import java.util.Properties;
    
    /**
     * @author King老师   
     */
    public class HelloKafkaProducer {
    
        public static void main(String[] args) {
            //TODO 生产者三个属性必须指定(broker地址清单、key和value的序列化器)
            Properties properties = new Properties();
            properties.put("bootstrap.servers","192.168.2.61:9092");
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
            try {
                ProducerRecord<String,String> record;
                try {
                    //  发送4条消息
                    for(int i=0;i<4;i++){
    // 这里的key值为null,所以kafka会根据分区总数把数据负载均衡到每个分区,如果有值,则根据值来判断存到哪个分区。 record
    = new ProducerRecord<String,String>(BusiConst.HELLO_TOPIC, null,"lison"+i);
                // 生产者有3种发送方式:1、发送并忘记;2、同步发送;3、异步发送 producer.send(record); // 此处是 1、发送并忘记 System.out.println(i
    +",message is sent"); } } catch (Exception e) { e.printStackTrace(); } } finally { producer.close(); } } }

     重要知识:

    如果该topic的分区大于1,那么生产者生产的数据存放到哪个分区,完全取决于key值,比如key=A,那么存到分区0,key=B,那么存到分区1,如果key为null,那么负载均衡存储到每个分区!

    相关资料参考: 《kafka2.5.0分区再均衡监听器java例子

    生产者。发送方式:2、同步发送;

    Future<RecordMetadata> future = producer.send(record);
    System.out.println("do other sth");
    RecordMetadata recordMetadata = future.get();//有可能阻塞在这个位置
    if(null!=recordMetadata){
          System.out.println("offset:"+recordMetadata.offset()+"-" +"partition:"+recordMetadata.partition());
    }

    生产者。发送方式:3、异步发送;

    ProducerRecord<String,String> record = new ProducerRecord<String,String>(
                        BusiConst.HELLO_TOPIC,"teacher14","deer");
    producer.send(record, new Callback() {
          public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if(null!=exception){
                            exception.printStackTrace();
                        }
                        if(null!=metadata){
                            System.out.println("offset:"+metadata.offset()+"-"
                                    +"partition:"+metadata.partition());
                        }
         }
    });

    2)消费者:

    import cn.enjoyedu.config.BusiConst;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    /**
     * @author King老师   
     */
    public class HelloKafkaConsumer {
    
        public static void main(String[] args) {
            /* 消费者三个属性必须指定(broker地址清单、key和value的反序列化器) */
            Properties properties = new Properties();
            properties.put("bootstrap.servers","192.168.2.61:9092");
            properties.put("key.deserializer", StringDeserializer.class);
            properties.put("value.deserializer", StringDeserializer.class);
            //  群组并非完全必须. 重要知识:在同一Topic下,相同的groupID消费群组中,只有一个消费者可以拿到数据。
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");
            KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
            try {
                //消费者订阅主题(可以多个)
                consumer.subscribe(Collections.singletonList(BusiConst.HELLO_TOPIC));
                while(true){
                    //TODO 拉取(新版本)
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
                    for(ConsumerRecord<String, String> record:records){
                        System.out.println(String.format("topic:%s,分区:%d,偏移量:%d," + "key:%s,value:%s",record.topic(),record.partition(),
                                record.offset(),record.key(),record.value()));
                        // TODO
                    }
                }
    
                //通过另外一个线程 consumer. wakeup()
            } finally {
                consumer.close();
            }
        }
    }

    end.

  • 相关阅读:
    Could A New Linux Base For Tablets/Smartphones Succeed In 2017?
    使用libhybris,glibc和bionic共存时的TLS冲突的问题
    6 Open Source Mobile OS Alternatives To Android in 2018
    Using MultiROM
    GPU drivers are written by the GPU IP vendors and they only provide Android drivers
    Jolla Brings Wayland Atop Android GPU Drivers
    How to Use Libhybris and Android GPU Libraries with Mer (Linux) on the Cubieboard
    闲聊Libhybris
    【ARM-Linux开发】wayland和weston的介绍
    Wayland and X.org problem : Why not following the Android Solution ?
  • 原文地址:https://www.cnblogs.com/zhuwenjoyce/p/13179609.html
Copyright © 2011-2022 走看看