zoukankan      html  css  js  c++  java
  • Kafka Consumer API示例

    既然翻到这里,默认就认为已经基本掌握了Kafka的基础知识,本小结主要给出一次使用Kafka Consumer API的示例。我们都知道Kafka API有旧版(0.8版之前)和新版(0.9版之后),这里讲的是新版,官网KafkaConsumer有更详细介绍,可自行前往~

    1 环境配置

    • 操作系统: Ubuntu 16.04
    • kafka_2.11-0.10.2.2
    • JDK: 1.8.0_181
    • IntelliJ IDEA Maven
    • VNC

    2 操作过程

    pom.xml文件

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.kafka.test</groupId>
        <artifactId>test</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <name>maven-kafka</name>
        <url>http://maven.apache.org</url>
        <dependencies>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.10.2.2</version>
            </dependency>
            <dependency>
                <groupId><org.apache.kafka/groupId>
                <artifactId>kafka_2.11</artifactId>
                <version><0.10.2.2/version>
            </dependency>
    
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.21</version>
            </dependency>
    
            <dependency>
                <groupId>org.xerial.snappy</groupId>
                <artifactId>snappy-java</artifactId>
                <version>1.1.2.6</version>
            </dependency>
    
        </dependencies>
    
    </project>
    

    kafka consumer消费数据,未SASL认证,这里的代码只能消费生产者正在推送的数据:

    import java.util.Arrays;
    import java.util.Properties;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.TopicPartition;
    
    
    public class KafkaConsumerTest {
        public static void main(String[] args) throws Exception {
    
            // Kafka consumer configuration settings
            String topicName = "XXXX";
            Properties props = new Properties();
            
            props.put("bootstrap.servers", "make.kafka.com:9092,make.kafka.com:9093,make.kafka.com:9094");
            props.put("group.id", "XXXX");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("offsets.storage", "kafka");
    
            // 要发送自定义对象,需要指定对象的反序列化类
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            @SuppressWarnings("resource")
            final KafkaConsumer<String, Object> consumer = new KafkaConsumer<String, Object>(props);
            consumer.subscribe(Arrays.asList(topicName));
    
            while (true) {
                ConsumerRecords<String, Object> records = consumer.poll(100);
                for (ConsumerRecord<String, Object> record : records) {
                    System.out.println(record.value());
                }
            }
    
        }
    }
    

    kafka consumer消费数据,SASL认证,可以消费历史数据:

    import java.io.File;
    import java.io.FileOutputStream;
    import java.io.PrintStream;
    import java.util.Arrays;
    import java.util.Collection;
    import java.util.Map;
    import java.util.Properties;
    
    import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.TopicPartition;
    
    
    public class KafkaConsumerTest {
        public static void main(String[] args) throws Exception {
    
          
            String topicName = "XXXX";
            Properties props = new Properties();
    
    
    
            System.setProperty("java.security.auth.login.config", "/opt/kafka/kafka1/kafka_2.11-0.10.2.2/config/kafka_client_jaas.conf"); // 环境变量添加,需要输入配置文件的路径
            props.put("security.protocol", "SASL_PLAINTEXT");
            props.put("sasl.mechanism", "PLAIN");
    
            props.put("bootstrap.servers", "make.kafka.com:9092,make.kafka.com:9093,make.kafka.com:9094");
            props.put("group.id", "XXXX");        
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("offsets.storage", "kafka");
            props.put("max.poll.records",1000);
    
            // 要发送自定义对象,需要指定对象的反序列化类
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            @SuppressWarnings("resource")
    
            final KafkaConsumer<String, Object> consumer = new KafkaConsumer<String, Object>(props);
            consumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
    
                }
    
                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                    Map<TopicPartition,Long> beginningOffset = consumer.beginningOffsets(collection);
    
                    //--from-beginning
                    for(Map.Entry<TopicPartition,Long> entry : beginningOffset.entrySet()){
                        //seekToBeginning()
                        consumer.seekToBeginning(collection);
                    }
    
                }
            });
    
            while (true) {
                ConsumerRecords<String, Object> records = consumer.poll(20000);
                for (ConsumerRecord<String, Object> record : records) {
                    System.out.println(record.value());
                }
            }
    
    
        }
    }
    

    另外,选择把kafka作为消息的中间件,主要是拿到数据持久化到本地或者HDFS待分析挖掘出重要的信息,可以使用Sparkstreaming存到HDFS,这里给出从控制台信息存到了本地磁盘。

    import java.io.*;
    
    
    public class IO2File {
        public static void main(String[] args) throws IOException{
            File f = new File("out.json");
            f.createNewFile();
            FileOutputStream fileOutputStream = new FileOutputStream(f);
            PrintStream printStream = new PrintStream(fileOutputStream);
            System.setOut(printStream);
            System.out.println("xxxxxxx out.json");//  结合上面的代码 直接把kafka消息的信息打印到控制台 然后存到磁盘
        }
    }
    

    有时间再把Kafka基本原理 存储 配置信息 SASL授权 Spark都总结出来。

  • 相关阅读:
    tips
    【十大算法实现之KNN】KNN算法实例(含测试数据和源码)
    智力趣题几则
    JAVA知多少
    R语言(入门小练习篇)
    文本分类,数据挖掘和机器学习
    推荐系统的循序进阶读物(从入门到精通)
    【贪心】PAT 1033. To Fill or Not to Fill (25)
    博弈故事一则——海盗分金币问题
    基于WordNet的英文同义词、近义词相似度评估及代码实现
  • 原文地址:https://www.cnblogs.com/eugene0/p/11437459.html
Copyright © 2011-2022 走看看