zoukankan      html  css  js  c++  java
  • 项目实战 从 0 到 1 学习之Flink (27)FlinkSql教程(一)

    环境准备

    安装Docker及相关镜像

    • Docker下载链接 由于买不起mac,所以挂的windows的链接,可自行去Docker官网下载其他版本。
      安装过程省略,毕竟一路下一步的东西,下面开始安装并启动kafka、mysql等服务。

    • 安装zookeeper服务

      • 因为kafka需要将许多信息固化存储在zk上,所以我们首先得安装zookeeper服务
      • 执行docker run -d --name zookeeper --publish 2181:2181 wurstmeister/zookeeper,这里将2181端口绑定到本地,之后kafka才能连上zk。这样,我们的zk就安装完毕,接下来我们开始验证是否正确启动。
      • 本地执行docker ps -a,能看到如下所示:
      PS C:Users	zmaj> docker ps -a
      CONTAINER ID        IMAGE                    COMMAND                  CREATED             STATUS              PORTS                                                NAMES
      2b8cd369aa3e        wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   3 seconds ago       Up 2 seconds        22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181>2181/tcp   zookeeper
    • 第一个值是容器id,现在我们把这个id拷出来,执行docker exec -it 1bf952a747ef '/bin/bash',这样我们就进入到了容器里面。接下来进入/opt/zookeeper-3.4.13/bin目录,执行./zkCli.sh,进入zk的客户端

      root@2b8cd369aa3e:/opt/zookeeper-3.4.13/bin# ./zkCli.sh
      Connecting to localhost:2181
      2020-04-28 08:13:40,238 [myid:] - INFO  [main:Environment@100] - Client environment:zookeeper.version=3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 04:05 GMT
      2020-04-28 08:13:40,241 [myid:] - INFO  [main:Environment@100] - Client environment:host.name=2b8cd369aa3e
      2020-04-28 08:13:40,241 [myid:] - INFO  [main:Environment@100] - Client environment:java.version=1.7.0_65
      2020-04-28 08:13:40,243 [myid:] - INFO  [main:Environment@100] - Client environment:java.vendor=Oracle Corporation
      2020-04-28 08:13:40,243 [myid:] - INFO  [main:Environment@100] - Client environment:java.home=/usr/lib/jvm/java-7-openjdk-amd64/jre
      2020-04-28 08:13:40,243 [myid:] - INFO  [main:Environment@100] - Client environment:java.class.path=/opt/zookeeper-3.4.13/bin/../build/classes:/opt/zookeeper-3.4.13/bin/../build/lib/*.jar:/opt/zookeeper-3.4.13/bin/../lib/slf4j-log4j12-1.7.25.jar:/opt/zookeeper-3.4.13/bin/../lib/slf4j-api-1.7.25.jar:/opt/zookeeper-3.4.13/bin/../lib/netty-3.10.6.Final.jar:/opt/zookeeper-3.4.13/bin/../lib/log4j-1.2.17.jar:/opt/zookeeper-3.4.13/bin/../lib/jline-0.9.94.jar:/opt/zookeeper-3.4.13/bin/../lib/audience-annotations-0.5.0.jar:/opt/zookeeper-3.4.13/bin/../zookeeper-3.4.13.jar:/opt/zookeeper-3.4.13/bin/../src/java/lib/*.jar:/opt/zookeeper-3.4.13/bin/../conf:
      2020-04-28 08:13:40,243 [myid:] - INFO  [main:Environment@100] - Client   environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
      2020-04-28 08:13:40,244 [myid:] - INFO  [main:Environment@100] - Client environment:java.io.tmpdir=/tmp
      2020-04-28 08:13:40,244 [myid:] - INFO  [main:Environment@100] - Client environment:java.compiler=<NA>
      2020-04-28 08:13:40,244 [myid:] - INFO  [main:Environment@100] - Client environment:os.name=Linux
      2020-04-28 08:13:40,244 [myid:] - INFO  [main:Environment@100] - Client environment:os.arch=amd64
      2020-04-28 08:13:40,244 [myid:] - INFO  [main:Environment@100] - Client environment:os.version=4.19.76-linuxkit
      2020-04-28 08:13:40,244 [myid:] - INFO  [main:Environment@100] - Client environment:user.name=root
      2020-04-28 08:13:40,245 [myid:] - INFO  [main:Environment@100] - Client environment:user.home=/root
      2020-04-28 08:13:40,245 [myid:] - INFO  [main:Environment@100] - Client environment:user.dir=/opt/zookeeper-3.4.13/bin
      2020-04-28 08:13:40,246 [myid:] - INFO  [main:ZooKeeper@442] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@674e5e21
      Welcome to ZooKeeper!
      2020-04-28 08:13:40,268 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@1029] - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
      2020-04-28 08:13:40,278 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@879] - Socket connection established to localhost/127.0.0.1:2181, initiating session
      JLine support is enabled
      2020-04-28 08:13:40,300 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@1303] - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x100005e85070000, negotiated timeout = 30000
      
      WATCHER::
      
      WatchedEvent state:SyncConnected type:None path:null
      [zk: localhost:2181(CONNECTED) 0]
    • 接下来再执行ls /

      [zk: localhost:2181(CONNECTED) 0] ls /
      [zookeeper]

      看来我们的zk应该装的没什么问题,那么一路ctrl+d退到最外面

      • 如果在执行docker ps -a发现STATUS那一列显示EXIT,那说明有问题,使用docker logs CONTAINER ID查看具体日志
      • 也可以使用类似于ZooInspector或者JAVA客户端等其他方式去连接zk,来验证zk服务是否正确启动
    • 安装kafka服务

      • 执行docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=172.17.47.44:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.47.44:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka:2.11-0.11.0.3注意:此处的172.17.47.44是我本地的ip,大家在抄作业的时候,记得写上自己的名字。接下来我们开始验证kafka能否正常使用。
      • 执行docker ps -a找到kafka的容器id,然后执行docker exec -it e51c12e9a077 '/bin/bash',这样我们进入了kafka的容器里。接下来进入kafka客户端的目录/opt/kafka/bin,执行kafka-console-producer.sh --broker-list 172.17.47.44:9092 --topic mykafka,进入kafka生产者客户端。
      • 再启动一个命令行,同样进入kafka客户端目录,然后执行kafka-console-consumer.sh --bootstrap-server 172.17.47.44:9092 --topic mykafka --from-beginning,这样进入kafka消费者客户端。
      • 切回kafka生产者客户端窗口,输入我们准备好的JSON数据{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"},此时,观察kafka消费者客户端窗口,能看到一条数据被打印出来。如下所示:
      PS C:Users	zmaj> docker exec -it 7a58196af291 '/bin/bash'
      bash-4.4# cd /opt/kafka/bin
      bash-4.4# kafka-console-producer.sh --broker-list 172.17.47.44:9092 --topic mykafka
      >{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
      >{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
      >{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
      >{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
      PS C:Users	zmaj> docker exec -it 7a58196af291 '/bin/bash'
      bash-4.4# cd /opt/kafka/bin/
      bash-4.4# kafka-console-consumer.sh --bootstrap-server 172.17.47.44:9092 --topic mykafka --from-beginning
      {"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
      {"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
      {"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
      {"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
      ^CProcessed a total of 4 messages
      • 第一段是生产者,第二段是消费者,看样子我们能够正确的消费和生产数据。NICE!下面用JAVA测试一下能否正常消费
       Properties properties = new Properties();
            properties.put("bootstrap.servers", "172.17.47.44.:9092");
            properties.put("group.id", "test1");
            properties.put("auto.offset.reset", "earliest");
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
            kafkaConsumer.subscribe(Arrays.asList("mykafka"));
            while (true) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf(" value is =>>> %s", record.value());
                }
            }

      启动,然后观察控制台

      value is =>>> {"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
    • 能够正常消费,我们的kafka客户端也没问题

    • 安装mysql服务

      • 执行

        docker run --name mysql -e MYSQL_ROOT_PASSWORD=123456 -d -p 3306:3306 mysql:5.6

        命令,此处的MYSQL_ROOT_PASSWORD=123456是将root密码设置为123456,大家可以根据个人喜爱自己修改

      • 使用Navicat连接我们的mysql
        mysql.png

        主机和端口分别就是我们的ip和设置的端口3306,密码就是上一步提到的MYSQL_ROOT_PASSWORD,测试连接
        mysql.png

        连接成功!接下来连入我们的mysql,并执行select CURRENT_TIMESTAMP() from dual,返回2020-04-28 07:38:48,看样子我们的mysql也没什么问题。

    • 最后,数据下载地址 UserBehavior.csv

    • 好了,我们Flink Sql教程系列第一课环境准备完成了,接下来开始正式进入Flink Sql的学习之旅。

     
    作者:大码王

    -------------------------------------------

    个性签名:独学而无友,则孤陋而寡闻。做一个灵魂有趣的人!

    如果觉得这篇文章对你有小小的帮助的话,记得在右下角点个“推荐”哦,博主在此感谢!

    万水千山总是情,打赏一分行不行,所以如果你心情还比较高兴,也是可以扫码打赏博主,哈哈哈(っ•?ω•?)っ???!

  • 相关阅读:
    Linux Ubuntu 忘记用户名和密码 解决办法
    C语言中标准输入流、标准输出流、标准错误输出流
    递归实现字符串反转char* reverse(char* str)合集
    (转)最好的求平方根的方法(精确度VS速度)Best Square Root Method Algorithm Function (Precision VS Speed)
    java验证码识别4
    互聯網產品設計主題詞表
    java验证码识别3
    C++实现C#的get,set属性操作
    简陋,山寨,Everything,桌面搜索,原理,源码
    java验证码识别1
  • 原文地址:https://www.cnblogs.com/huanghanyu/p/13913209.html
Copyright © 2011-2022 走看看