zoukankan      html  css  js  c++  java
  • Flink对接kafka

    启动kafka和flink

    1、进入zookeeper的bin目录下启动zookeeper

    ./zkServer.sh start

    2、进入kafka的bin目录下启动kafka

    /kafka-server-start.sh -daemon /opt/module/kafka-0.11/config/server.properties

    3、进入flink的bin目录下启动flink

    ./start-cluster.sh 

    kafka启动生产者

    kafka主题为sensor

    ./bin/kafka-console-producer.sh --broker-list 192.168.158.202:90992 --topic sensor

    添加pom依赖

    <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
                <version>1.10.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.11.0.0</version>
            </dependency>

    执行

    Java代码如下

    package com.test.apitest.souceTest;
    
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
    import java.util.Properties;
    
    public class SourceTest02_kafka {
        public static void main(String[] args) throws Exception {
            // 创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // kafka配置项
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers","192.168.153.202:9092");
            properties.setProperty("group.id", "consumer-group");
            properties.setProperty("key.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("value.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("auto.offset.reset", "latest");
    
    
    
            // 从kafka中读取数据
            DataStreamSource<String> sensor = env.addSource(new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties));
    
            sensor.print();
            //执行任务
            env.execute();
        }
    }

    kafka生产数据

     flink消费数据

  • 相关阅读:
    ny2 括号配对问题
    ny14 会场安排问题
    杭电ACM题目分类
    hdoj2037 今年暑假不AC
    ny37 回文字符串
    算法 字符串的排列组合
    手撸IoC
    Java设计模式
    多种方法求java求整数的位数
    二叉树之 二叉树深度
  • 原文地址:https://www.cnblogs.com/lmr7/p/15460103.html
Copyright © 2011-2022 走看看