zoukankan      html  css  js  c++  java
  • Flink学习

    1、任务提交到本地Flink运行:

        首先,启动本地的Flink(见前面的启动方式),浏览器登陆:localhost:8081 。然后,将写好的程序打包,然后将打包好的 .jar 包上传:

                

     上传后,如上图,箭头所指就是刚刚上传后的 jar包,点击这个 jar 包,可以编辑参数:

                

    填写启动类、启动参数、并行度....... ,提交 submit 的时候,要先启动 Flink任务所需的流,比如本文使用的socket流: nc -lk 7777 ,启动的7777这个端口。

    如果程序中使用的print答应,在这里如何查看:

                

     点击红框中的任务:

                

     在上面的 Stdout 即可看见输出。

    2、本地连接 Kafka

    首先,按照 Kafka,通过官网下载:https://kafka.apache.org/downloads,可参考博客:https://www.cnblogs.com/zhaoshizi/p/12154518.html

                

    使用:tar -xzvf kafka_2.12-3.0.0.tgz 解压,进入解压后的目录,当前下载的kafka程序里自带Zookeeper,可以直接使用其自带的Zookeeper建立集群,kafka自带的Zookeeper程序使用 bin/zookeeper-server-start.sh,以及 bin/zookeeper-server-stop.sh来启动和停止Zookeeper。

    启动zookeeper : 

    ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties 

    注意,必须要跟后面的 -daemon....,表示守护进程,不跟报错。

    然后,启动 Kafka:

    ./bin/kafka-server-start.sh -daemon ./config/server.properties

    同理,也必须要跟后面的 -daemon,守护进程....,

    最后,启动 Kafka 的 producer,注册 topic,指定端口(broker-list指定集群中的一个或者多个服务器,一般我们再使用console producer的时候,这个参数是必备参数,另外一个必备的参数是topic ):

    ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sensor

    然后就可以发送数据了。

                

    程序接收到的数据;

                

     程序的源码为:

    public class SourceTest3_ReadKafka {
        public static void main(String[] args) throws Exception{
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "localhost:9092");
    
            DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer<String>("sensor",new SimpleStringSchema(), properties));
    
            dataStream.print();
            env.execute();
        }
    }

    停止 Kafka :

    ./bin/kafka-server-stop.sh ./config/server.properties

    停止Zookeeper :

    ./bin/zookeeper-server-stop.sh -daemon ./config/zookeeper.properties

    使用 jps 查看是否还有 Kafka的进程,没有的话就是正常停止掉了。

    Kafka去消费topic的数据:

    首先启动Zookeeper、Kafka后,启动消费(bootstrap-servers指的是目标集群的服务器地址,这个和broker-list功能是一样的):

    ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sinktest

    这里,消费的是本地的localhost,topic的名称是:sinktest

                

     

  • 相关阅读:
    CentOS 7 安装java 环境
    CentOS 7 替换网易yum 源
    九度:题目1553:时钟
    Maximum Subarray
    职场细节
    poj2524 Ubiquitous Religions
    九度 1526:朋友圈
    程序载入
    设备管理
    操作系统系列
  • 原文地址:https://www.cnblogs.com/luo-c/p/15499043.html
Copyright © 2011-2022 走看看