zoukankan      html  css  js  c++  java
  • Flink 读取 Kafka 数据 (极简版)

    本文讲述:本地 Flink 1.7.0 (Java SDK) 读取本地 Kafka 数据,不做任何处理直接打印输出到控制台

    环境:win10 + WSL

    步骤略过

    1. 启动 Kafka 并创建 topic

    以下命令都在解压后的 Kafka 文件夹内执行

    1.1 启动 Kafka

    启动 zookeeper

     ./bin/zookeeper-server-start.sh config/zookeeper.properties
    

    启动 kafka 服务

    ./bin/kafka-server-start.sh config/server.properties
    

    1.2 创建 topic

    ./bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
    

    1.3 启动生产者和消费者

    启动生产者

    ./bin/kafka-console-producer.sh --topic topic2 --broker-list localhost:9092
    

    启动消费者

    ./bin/kafka-console-consumer.sh --topic topic2 --from-beginning --bootstrap-server localhost:9092
    

    此时,在生产者窗口中输入任意字符,即可在消费者窗口中看到相应的输出

    进入解压后的 flink 文件夹内

    ./bin/start-cluster.sh
    

    3. 使用 JAVA 中读取 Kafka 内容

    代码如下:

    package com.xjr7670;
    
    
    import org.apache.flink.api.common.serialization.DeserializationSchema;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
    
    import java.util.Properties;
    
    public class StreamingJob {
    
    	public static void main(String[] args) throws Exception {
    		// set up the streaming execution environment
    		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    		Properties properties = new Properties();
    		properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
    		properties.setProperty("group.id", "g2");    // 第 1 个参数是固定值 group.id,第 2 个参数是自定义的组 ID
    		DeserializationSchema<String> deserializationSchema = new SimpleStringSchema();
    		String topic = "topic2";
    		DataStream<String> text = env.addSource(new FlinkKafkaConsumer011<String>(topic, deserializationSchema, properties));
    		text.print();
    		
    		env.execute("Flink-Kafka demo");
    	}
    }
    

    POM 依赖配置略过。以上代码可直接执行,执行后,在 Kafka 生产者窗口中输入消息,即可在代码输出窗口中看到同样的消息输出
    如果执行时遇到报错:

    java.lang.ClassNotFoundException: org.apache.flink.api.common.serialization.DeserializationSchema
    

    需要添加 flink/lib 下的包到项目内

  • 相关阅读:
    SQL实战(四)
    SQL实战(三)
    SQL实战(二)
    数据库SQL实战(一)
    算法(二)——背包问题
    华为机试(五)
    算法(一)
    华为机试练习(四)
    华为往年机试题目(三)
    T分布(T-Distribution)
  • 原文地址:https://www.cnblogs.com/wuzhiblog/p/14251788.html
Copyright © 2011-2022 走看看