zoukankan      html  css  js  c++  java
  • PYFLINK 基础 (一):运行相关(一)PYFLINK安装与本地运行(WINDOWS10)(TABLE demo)

    来源:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/python/table_api_tutorial/

    一 安装环境与安装

    您需要一台具有以下功能的计算机:

    • Java 8 or 11
    • Python 3.6, 3.7 or 3.8

    使用Python Table API需要安装PyFlink,它已经被发布到 PyPi,您可以通过如下方式安装PyFlink:

    $ python -m pip install apache-flink
    

    安装PyFlink后,您便可以编写Python Table API作业了。

    二 编写一个Flink Python Table API程序 

    编写Flink Python Table API程序的第一步是创建TableEnvironment。这是Python Table API作业的入口类。

    exec_env = ExecutionEnvironment.get_execution_environment()
    exec_env.set_parallelism(1)
    t_config = TableConfig()
    t_env = BatchTableEnvironment.create(exec_env, t_config)

    接下来,我们将介绍如何创建源表和结果表。

    
    
    t_env.connect(FileSystem().path('C:\Users\DELL\Desktop\PYFLINK\input.csv')) 
    .with_format(OldCsv()
    .field('word', DataTypes.STRING()))
    .with_schema(Schema()
    .field('word', DataTypes.STRING()))
    .create_temporary_table('mySource')

    t_env.connect(FileSystem().path('C:\Users\DELL\Desktop\PYFLINK\ouput.csv'))
    .with_format(OldCsv()
    .field_delimiter(' ')
    .field('word', DataTypes.STRING())
    .field('count', DataTypes.BIGINT()))
    .with_schema(Schema()
    .field('word', DataTypes.STRING())
    .field('count', DataTypes.BIGINT()))
    .create_temporary_table('mySink')
     

    上面的程序展示了如何创建及在ExecutionEnvironment中注册表名分别为mySourcemySink的表。 其中,源表mySource有一列: word,该表代表了从输入文件input.csv中读取的单词; 结果表mySink有两列: word和count,该表会将计算结果输出到文件output.csv中,字段之间使用 作为分隔符。

    接下来,我们介绍如何创建一个作业:该作业读取表mySource中的数据,进行一些变换,然后将结果写入表mySink

    最后,需要做的就是启动Flink Python Table API作业。上面所有的操作,比如创建源表 进行变换以及写入结果表的操作都只是构建作业逻辑图,只有当execute_insert(sink_name)被调用的时候, 作业才会被真正提交到集群或者本地进行执行。

    from pyflink.table.expressions import lit
    tab = t_env.from_path('mySource')
    tab.group_by(tab.word)
    .select(tab.word, lit(1).count)
    .execute_insert('mySink').wait()

    该教程的完整代码如下:

    from pyflink.dataset import ExecutionEnvironment
    from pyflink.table.descriptors import Schema, OldCsv, FileSystem
    from pyflink.table.expressions import lit
    from pyflink.table import (
        TableConfig,
        DataTypes,
        BatchTableEnvironment
    )
    
    
    exec_env = ExecutionEnvironment.get_execution_environment()
    exec_env.set_parallelism(1)
    t_config = TableConfig()
    t_env = BatchTableEnvironment.create(exec_env, t_config)
    
    t_env.connect(FileSystem().path('C:\Users\DELL\Desktop\PYFLINK\input.csv')) 
        .with_format(OldCsv()
                     .field('word', DataTypes.STRING())) 
        .with_schema(Schema()
                     .field('word', DataTypes.STRING())) 
        .create_temporary_table('mySource')
    
    t_env.connect(FileSystem().path('C:\Users\DELL\Desktop\PYFLINK\ouput.csv')) 
        .with_format(OldCsv()
                     .field_delimiter('	')
                     .field('word', DataTypes.STRING())
                     .field('count', DataTypes.BIGINT())) 
        .with_schema(Schema()
                     .field('word', DataTypes.STRING())
                     .field('count', DataTypes.BIGINT())) 
        .create_temporary_table('mySink')
    
    tab = t_env.from_path('mySource')
    tab.group_by(tab.word) 
       .select(tab.word, lit(1).count) 
       .execute_insert('mySink').wait()
    from pyflink.table import EnvironmentSettings, TableEnvironment,BatchTableEnvironment
    
    environment_settings = EnvironmentSettings.new_instance().use_blink_planner().in_batch_mode().build()
    t_env = BatchTableEnvironment.create(environment_settings=environment_settings)
    t_env.get_config().get_configuration().set_string('parallelism.default', '1')
    
    t_env.execute_sql("""
             CREATE TABLE mySource (
               word STRING
             ) WITH (
               'connector' = 'filesystem',
               'format' = 'csv',
               'path' = 'C:\Users\DELL\Desktop\PYFLINK\input.csv'
             )
         """
    
    )
    
    
    
    t_env.execute_sql("""
             CREATE TABLE mySink (
               word STRING,
               `count` BIGINT
             ) WITH (
               'connector' = 'filesystem',
               'format' = 'csv',
               'path' = 'C:\Users\DELL\Desktop\PYFLINK\word_count_output.csv'
             )
         """)
    #
    #t_env.from_path('mySource') 
    #    .group_by('word') 
    #    .select('word, count(1)') 
    #    .execute_insert('mySink').wait()
    #
    
    from pyflink.table.expressions import lit
    tab = t_env.from_path('mySource')
    tab.group_by(tab.word) 
       .select(tab.word, lit(1).count) 
       .execute_insert('mySink').wait()

    三 执行一个Flink Python Table API程序

    首先,你需要在文件 “input.csv” 中准备好输入数据。你可以选择通过如下命令准备输入数据:

    input.csv

    flink
    pyflink
    flink

    接下来,可以在命令行中运行作业(假设作业名为WordCount.py)(注意:如果输出结果文件“output.csv”已经存在,你需要先删除文件,否则程序将无法正确运行起来):

    $ python WordCount.py

    上述命令会构建Python Table API程序,并在本地mini cluster中运行。如果想将作业提交到远端集群执行, 可以参考作业提交示例

    最后,你可以通过如下命令查看你的运行结果:

    ouput.csv

    flink    2
    pyflink    1
  • 相关阅读:
    jeecg多页签的选择切换
    设计模式:工厂三姐妹一网打尽
    设计模式:工厂三姐妹一网打尽
    设计模式:工厂三姐妹一网打尽
    设计模式:工厂三姐妹一网打尽
    三、原子操作
    三、原子操作
    三、原子操作
    三、原子操作
    WebClient HttpWebRequest从网页中获取请求数据
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/14869332.html
Copyright © 2011-2022 走看看