zoukankan      html  css  js  c++  java
  • [kafka] 005_kafka_Java_API

    1、生产者Producer

    1)添加依赖

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.0</version>
        </dependency>

    2)简单推送代码

    文档参考:http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

    同步推送:

     Properties props = new Properties();
    
     props.put("bootstrap.servers", "localhost:9092");
     props.put("acks", "all");
     props.put("retries", 0);
     props.put("batch.size", 16384);
     props.put("linger.ms", 1);
     props.put("buffer.memory", 33554432);
     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
     props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
     Producer<String, String> producer = new KafkaProducer<>(props);
    
     for(int i = 0; i < 100; i++)
         producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
    
     producer.close();

    异步推送:

    public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)

    对比:

    If you want to simulate a simple blocking call you can call the get() method immediately:
    
     byte[] key = "key".getBytes();
     byte[] value = "value".getBytes();
     ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("topic1", key, value)
     producer.send(record).get();
     
    Fully non-blocking usage can make use of the Callback parameter to provide a callback that will be invoked when the request is complete.
    
     ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("topic1", key, value);
     producer.send(myRecord,
                   new Callback() {
                       public void onCompletion(RecordMetadata metadata, Exception e) {
                           if(e != null)
                               e.printStackTrace();
                           System.out.println("The offset of the record we just sent is: " + metadata.offset());
                       }
                   });

    2、消费者Consumer

    1)添加依赖

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.0</version>
        </dependency>

    2)简单拉取代码

    更多请查看:http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

       Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:9092");
         props.put("group.id", "test");
         props.put("enable.auto.commit", "true");
         props.put("auto.commit.interval.ms", "1000");
         props.put("session.timeout.ms", "30000");
         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer
    <String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar"));
    while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s",
              record.offset(), record.key(), record.value()); }

    附上以验证通过的producer推送代码:

        public RecordMetadata sendSyncKafkaRequest(String topic, KeyModel keyModel, Object message) {
    
            logger.info("=== 推送服务开始:sendSyncKafkaRequest start ===");
            logger.info("=== topic: " + topic + "===");
            logger.info("=== keyModel: " + JSON.toJSONString(keyModel) + "===");
            logger.info("=== message: " + JSON.toJSONString(message) + "===");
    
            Properties props = kafkaProducerProperties.getProperties();
            KafkaProducer<KeyModel, Object> producer = null;
            RecordMetadata recordMetadata = null;
            try {
                producer = new KafkaProducer<KeyModel, Object>(props);
    
    recordMetadata
    = producer.send(new ProducerRecord<KeyModel, Object>(topic, keyModel, message)).get();
    }
    catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } if (recordMetadata != null) { logger.info("===kafka请求推送成功!Topic:" + recordMetadata.topic() + ";分区:" + recordMetadata.partition() + "==="); } else { logger.info("=== recordMetadata为 null!本次kafka 写入请求没有完成!==="); } return recordMetadata; }
  • 相关阅读:
    程序员必备的代码审查(Code Review)清单
    Laravel 在homestead 平台上命令
    Laravel5.5执行 npm run dev时报错,提示cross-env找不到(not found)的解决办法
    Laravel 的Artisan 命令学习
    github常见操作和常见错误!错误提示:fatal: remote origin already exists.
    Sublime如何设置背景透明
    jquery判断滚动条是否到底部
    mysql的数据恢复
    MySQL体系结构
    mysql-trigger-触发器
  • 原文地址:https://www.cnblogs.com/avivaye/p/5708171.html
Copyright © 2011-2022 走看看