zoukankan      html  css  js  c++  java
  • Flink对接kafka

    启动kafka和flink

    1、进入zookeeper的bin目录下启动zookeeper

    ./zkServer.sh start

    2、进入kafka的bin目录下启动kafka

    /kafka-server-start.sh -daemon /opt/module/kafka-0.11/config/server.properties

    3、进入flink的bin目录下启动flink

    ./start-cluster.sh 

    kafka启动生产者

    kafka主题为sensor

    ./bin/kafka-console-producer.sh --broker-list 192.168.158.202:90992 --topic sensor

    添加pom依赖

    <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
                <version>1.10.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.11.0.0</version>
            </dependency>

    执行

    Java代码如下

    package com.test.apitest.souceTest;
    
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
    import java.util.Properties;
    
    public class SourceTest02_kafka {
        public static void main(String[] args) throws Exception {
            // 创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // kafka配置项
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers","192.168.153.202:9092");
            properties.setProperty("group.id", "consumer-group");
            properties.setProperty("key.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("value.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("auto.offset.reset", "latest");
    
    
    
            // 从kafka中读取数据
            DataStreamSource<String> sensor = env.addSource(new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties));
    
            sensor.print();
            //执行任务
            env.execute();
        }
    }

    kafka生产数据

     flink消费数据

  • 相关阅读:
    Android MediaScanner 详尽分析
    你要清楚的东西
    Centos下搭建Apache+mysql+php
    Centos 安装MySQL全过程
    sdk platform tools is missing please user the sdk manager to install it
    尚书令
    文字
    ubuntu下mysql配置
    学习Android开发,配置环境,
    Sphinx全文索引安装教程
  • 原文地址:https://www.cnblogs.com/lmr7/p/15460103.html
Copyright © 2011-2022 走看看