1、任务提交到本地Flink运行:
首先,启动本地的Flink(见前面的启动方式),浏览器登陆:localhost:8081 。然后,将写好的程序打包,然后将打包好的 .jar 包上传:
上传后,如上图,箭头所指就是刚刚上传后的 jar包,点击这个 jar 包,可以编辑参数:
填写启动类、启动参数、并行度....... ,提交 submit 的时候,要先启动 Flink任务所需的流,比如本文使用的socket流: nc -lk 7777 ,启动的7777这个端口。
如果程序中使用的print答应,在这里如何查看:
点击红框中的任务:
在上面的 Stdout 即可看见输出。
2、本地连接 Kafka
首先,按照 Kafka,通过官网下载:https://kafka.apache.org/downloads,可参考博客:https://www.cnblogs.com/zhaoshizi/p/12154518.html
使用:tar -xzvf kafka_2.12-3.0.0.tgz 解压,进入解压后的目录,当前下载的kafka程序里自带Zookeeper,可以直接使用其自带的Zookeeper建立集群,kafka自带的Zookeeper程序使用 bin/zookeeper-server-start.sh,以及 bin/zookeeper-server-stop.sh来启动和停止Zookeeper。
启动zookeeper :
./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
注意,必须要跟后面的 -daemon....,表示守护进程,不跟报错。
然后,启动 Kafka:
./bin/kafka-server-start.sh -daemon ./config/server.properties
同理,也必须要跟后面的 -daemon,守护进程....,
最后,启动 Kafka 的 producer,注册 topic,指定端口(broker-list指定集群中的一个或者多个服务器,一般我们再使用console producer的时候,这个参数是必备参数,另外一个必备的参数是topic ):
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sensor
然后就可以发送数据了。
程序接收到的数据;
程序的源码为:
public class SourceTest3_ReadKafka { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer<String>("sensor",new SimpleStringSchema(), properties)); dataStream.print(); env.execute(); } }
停止 Kafka :
./bin/kafka-server-stop.sh ./config/server.properties
停止Zookeeper :
./bin/zookeeper-server-stop.sh -daemon ./config/zookeeper.properties
使用 jps 查看是否还有 Kafka的进程,没有的话就是正常停止掉了。
Kafka去消费topic的数据:
首先启动Zookeeper、Kafka后,启动消费(bootstrap-servers指的是目标集群的服务器地址,这个和broker-list功能是一样的):
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sinktest
这里,消费的是本地的localhost,topic的名称是:sinktest