zoukankan      html  css  js  c++  java
  • 【Flink系列零】构建实时计算平台——FlinkSQL 作业菜鸟笔记

    前言

    因为最近的需求是做FlinkSQL平台,需要在实时计算平台上集成FlinkSQL功能,但目前刚刚有了研究成果,所以有了这篇笔记。

    第一步:编写一个流

    这里使用python编写的一个流,比Java简洁。

    需要注意的是 pip install kakfa-python,不能是 pip install kafka。

    这里生产的集群是SCRAM加密的,所以配置会多一些。

    有一个单词本,words.txt就是一些英文单词,一行一个。

    这个Producer每5秒产生一个记录,以JSON形式发布到流。

    from kafka import KafkaProducer
    import json
    import random
    import time
    import sys
    
    if __name__ == '__main__':
        producer = KafkaProducer(
            bootstrap_servers="kafka1211.slannka.com:9194",
            key_serializer=lambda v: str.encode if v is not None else None,
            value_serializer=lambda v: v.encode('utf-8') if v is not None else None,
            security_protocol="SASL_PLAINTEXT",
            sasl_mechanism="SCRAM-SHA-256",
            sasl_plain_username="slankkaCopyrightReserved",
            sasl_plain_password="passwordOfUsername",
            api_version=(2, 2, 1)
        )
    
        count = 0
        thefile = open("data/words.txt", "rb")
        while True:
            buffer = thefile.read(1024 * 8192)
            if not buffer:
                break
            count += buffer.count('
    '.encode())
    
        thefile.close()
    
        textfile = open("data/words.txt", "r")
        lines = textfile.readlines()  # 读取全部内容 ,并以列表方式返回
    
        while True:
            # initial values in each loop
            offset = 0
            word = None
            # get a random value represents a word
            randint = random.randint(0, count)
            print("total: ", count, ", randInt: ", randint)
    
            for line in lines:
                if offset == randint:
                    word = line.strip()
                    break
                offset += 1
            val = {
                "word": word,
                "len": len(word)
            }
            value = json.dumps(val)
            print("sending:", value)
            producer.send("test_enc_putong", value)
            print("send finished..(wait 5s.)")
            time.sleep(5.0)
    
    
        producer.close(3000)
        textfile.close()
    

    第二步:编写FLINKSQL

    create table WordCountTab (
        `word` STRING,
        `len` INT,
      # `ts` TIMESTAMP(3) METADATA FROM 'timestamp' #这一行不支持则可以去掉
    ) with (
        'connector' = 'kafka',
        'topic' = 'test_slankka',
        'properties.bootstrap.servers' = 'xxxxx.xxxxx.xxxxxx.com:9194',
        'properties.group.id' = 'test_flinksql_consumer',
        'format' = 'json',
        'scan.startup.mode' = 'earliest-offset',
        'properties.sasl.jaas.config'= 'org.apache.kafka.common.security.scram.ScramLoginModule required username="slankkaCopyrightReserved" password="passwordOfUsername";',
        'properties.sasl.mechanism' = 'SCRAM-SHA-256',
        'properties.security.protocol' = 'SASL_PLAINTEXT'
    );
    
    create table WordCountSink (
       word STRING,
       len INT
    ) WITH (
       'connector' = 'jdbc',
       'url' = 'jdbc:mysql://mysql1211.slankkaCorps.com:3306/rtc',
       'table-name' = 'flink_sink_test',
       'username' = 'root',
       'password' = 'root'
    );
    
    INSERT INTO WordCountSink
    SELECT word, len FROM WordCountTab;
    

    执行即可,生成一个Flink JOB,这个任务会不断得写<word,len>到Mysql中。

  • 相关阅读:
    ffmpeg rtmp推流 视频转码
    java日志发展史 log4j slf4j log4j2 jul jcl 日志和各种桥接包的关系
    nginx stream 流转发,可以转发rtmp、mysql访问流,转发rtmp、jdbc请求
    java web http 转https 通过nginx代理访问
    linux 服务器磁盘挂载
    novnc 通过websockify代理 配置多点访问
    linux 文件服务 minio 安装部署配置
    AOP实现原理,手写aop
    java 泛型
    JAVA反射getGenericSuperclass()用法
  • 原文地址:https://www.cnblogs.com/slankka/p/14121664.html
Copyright © 2011-2022 走看看