zoukankan      html  css  js  c++  java
  • Flink Sink Kafka

    原创转载请注明出处:https://www.cnblogs.com/agilestyle/p/15149998.html

    Kafka Setup

    wget https://downloads.apache.org/kafka/2.8.0/kafka_2.12-2.8.0.tgz
    
    tar zxvf kafka_2.12-2.8.0.tgz -C ~/app
    
    cd ~/app/kafka_2.12-2.8.0
    
    # start the zookeeper service
    bin/zookeeper-server-start.sh config/zookeeper.properties
    
    # start the Kafka broker service
    bin/kafka-server-start.sh config/server.properties
    
    # create topic
    bin/kafka-topics.sh --create --topic flink_topic --bootstrap-server localhost:9092
    
    # describle message
    bin/kafka-topics.sh --describe --topic flink_topic --bootstrap-server localhost:9092
    
    # produce message
    bin/kafka-console-producer.sh --topic flink_topic --bootstrap-server localhost:9092
    
    # consumer message
    bin/kafka-console-consumer.sh --topic flink_topic --from-beginning --bootstrap-server localhost:9092

    Maven Dependency

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.fool</groupId>
        <artifactId>flink</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>1.12.5</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>1.12.5</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_2.12</artifactId>
                <version>1.12.5</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka_2.12</artifactId>
                <version>1.12.5</version>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.20</version>
            </dependency>
    
        </dependencies>
    
    </project>

    SRC

    src/main/java/org/fool/flink/contract/Sensor.java

    package org.fool.flink.contract;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class Sensor {
        private String id;
        private Long timestamp;
        private Double temperature;
    }

    src/main/java/org/fool/flink/sink/SinkKafkaTest.java

    package org.fool.flink.sink;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
    import org.fool.flink.contract.Sensor;
    
    import java.util.Objects;
    import java.util.Properties;
    
    public class SinkKafkaTest {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            environment.setParallelism(1);
    
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "localhost:9092");
            properties.setProperty("group.id", "consumer-group");
            properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("auto.offset.reset", "latest");
    
            String inputPath = Objects.requireNonNull(ClassLoader.getSystemClassLoader().getResource("sensor.txt")).getPath();
    
            DataStream<String> inputStream = environment.readTextFile(inputPath);
    
            DataStream<String> dataStream = inputStream.map(new MapFunction<String, String>() {
                @Override
                public String map(String value) throws Exception {
                    String[] fields = value.split(",");
                    return new Sensor(fields[0], new Long(fields[1]), new Double(fields[2])).toString();
                }
            });
    
            dataStream.addSink(new FlinkKafkaProducer<>("flink_topic", new SimpleStringSchema(), properties));
    
            environment.execute();
        }
    }

    src/main/resources/sensor.txt

    1,1628754086,35.8
    2,1628754096,36.8
    3,1628754106,37.8
    4,1628754116,38.8
    1,1628754186,36.6
    2,1628754296,36.6
    3,1628754306,37.6
    4,1628754416,38.6
    1,1628754986,25.8
    1,1628754086,39.6
    2,1628754996,26.8
    3,1628754906,27.8
    4,1628754916,28.8

    Run

    Console Output

    Kafka Consumer


    欢迎点赞关注和收藏

    强者自救 圣者渡人
  • 相关阅读:
    通过IMM With Remote Console为服务器安装操作系统
    linux下编译安装php5.6出现 configure: error: Cannot find MySQL header files under /usr/local/mysql.
    5700交换机清除配置
    嵌入式驱动解析:从串口驱动到Linux驱动模型
    Win10自带Ubuntu子系统的安装与配置
    关于嵌入式C代码优化的几种方法
    2020软考高级系统分析师,你想知道的全在这
    libpng warning: iCCP: known incorrect sRGB profile
    pycharm中导入pygame库失败及解决办法
    pycharm中导入pygame等第三方库
  • 原文地址:https://www.cnblogs.com/agilestyle/p/15149998.html
Copyright © 2011-2022 走看看