zoukankan      html  css  js  c++  java
  • Kafka生产消费API JAVA实现

    Maven依赖:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.0.0</version>
    </dependency>
    

    Kafka生产者简单接口JAVA实现:

    import org.apache.kafka.clients.producer.*;
    import java.util.Properties;
    
    public class KafkaProducer {
        public static void main(String[] args) throws Exception{
            String topic = "";
            String brokerList = "";
            String message = "";
    
            Properties props = new Properties();
            props.put("bootstrap.servers", brokerList);
            props.put("acks", "1");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props);
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,message);
            producer.send(record, new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (e != null) {
                        e.printStackTrace();   
                    }
                }
            });
        }
    }
    

    Kafka消费者简单接口JAVA实现

    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    public class KafkaConsumer {
    
    	public static void main(String[] args) {
            String topic = "";
            String zkConnect = "";
    
            Properties prop = new Properties();
            prop.put("zookeeper.connect", zkConnect);
            prop.put("group.id", "group003");
            prop.put("auto.offset.reset", "largest");
            prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(prop));
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(topic, 1);
            Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
            final KafkaStream<byte[], byte[]> kafkaStream = messageStreams.get(topic).get(0);
            ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
            while (iterator.hasNext()) {
                String msg = new String(iterator.next().message());
                System.out.println("--------"+msg);
            }
    	}
    }
    

    Kafka新消费者接口JAVA实现

    import org.apache.kafka.clients.consumer.*;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    public class KafkaNewConsumer {
    
    	public static void main(String[] args) {
            String topic = "";
            String brokerList = "";
            String group="";
            
            Properties props = new Properties();
            props.put("bootstrap.servers", brokerList);
            props.put("group.id", group);
            props.put("auto.offset.reset", "earliest");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
            consumer.subscribe(Arrays.asList(topic));
            while(true) {
                ConsumerRecords<String, String> records = consumer.poll(10);
                for(ConsumerRecord<String, String> record : records) {
                    System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());
                }
            }
    	}
    }
    
  • 相关阅读:
    Android Studio安装与配置
    T-SQL:qualify和window 使用(十七)
    《c#图解教程》
    c# 创建,加载,修改XML文档
    c# 使用迭代器来创建可枚举类型
    C#上手练习3(while、do while语句)(添加机器人聊天)
    C#上手练习2(FOR语句)
    C#上手练习1(if语句、Swich语句)
    解决java导入project出现红叉
    ABAP ALV显示前排序合并及布局显示
  • 原文地址:https://www.cnblogs.com/xiaodf/p/10839730.html
Copyright © 2011-2022 走看看