zoukankan      html  css  js  c++  java
  • PYFLINK 基础 (五):运行相关(五)PYFLINK(STREAM demo)

    DataStream API Tutorial #

    Apache Flink offers a DataStream API for building robust, stateful streaming applications. It provides fine-grained control over state and time, which allows for the implementation of advanced event-driven systems. In this step-by-step guide, you’ll learn how to build a simple streaming application with PyFlink and the DataStream API.

    What Will You Be Building? #

    In this tutorial, you will learn how to write a simple Python DataStream pipeline. The pipeline will read data from a non-empty collection and write the results to the local file system.

    Prerequisites #

    This walkthrough assumes that you have some familiarity with Python, but you should be able to follow along even if you come from a different programming language.

    Help, I’m Stuck! #

    If you get stuck, check out the community support resources. In particular, Apache Flink’s user mailing list consistently ranks as one of the most active of any Apache project and a great way to get help quickly.

    How To Follow Along #

    If you want to follow along, you will require a computer with:

    • Java 8 or 11
    • Python 3.6, 3.7 or 3.8

    Using Python DataStream API requires installing PyFlink, which is available on PyPI and can be easily installed using pip.

    $ python -m pip install apache-flink
    

    Once PyFlink is installed, you can move on to write a Python DataStream job.

    DataStream API applications begin by declaring an execution environment (StreamExecutionEnvironment), the context in which a streaming program is executed. This is what you will use to set the properties of your job (e.g. default parallelism, restart strategy), create your sources and finally trigger the execution of the job.

    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    Once a StreamExecutionEnvironment is created, you can use it to declare your source. Sources ingest data from external systems, such as Apache Kafka, Rabbit MQ, or Apache Pulsar, into Flink Jobs.

    To keep things simple, this walkthrough uses a source that is backed by a collection of elements.

    ds = env.from_collection(
        collection=[(1, 'aaa'), (2, 'bbb')],
        type_info=Types.ROW([Types.INT(), Types.STRING()]))

    This creates a data stream from the given collection, with the same type as that of the elements in it (here, a ROW type with a INT field and a STRING field).

    You can now perform transformations on this data stream, or just write the data to an external system using a sink. This walkthrough uses the StreamingFileSink sink connector to write the data into a file in the /tmp/output directory.

    ds.add_sink(StreamingFileSink
        .for_row_format('/tmp/output', Encoder.simple_string_encoder())
        .build())

    The last step is to execute the actual PyFlink DataStream API job. PyFlink applications are built lazily and shipped to the cluster for execution only once fully formed. To execute an application, you simply call env.execute(job_name).

    env.execute("tutorial_job")

    The complete code so far:

    from pyflink.common.serialization import Encoder
    from pyflink.common.typeinfo import Types
    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.datastream.connectors import StreamingFileSink
    
    
    def tutorial():
        env = StreamExecutionEnvironment.get_execution_environment()
        env.set_parallelism(1)
        ds = env.from_collection(
            collection=[(1, 'aaa'), (2, 'bbb')],
            type_info=Types.ROW([Types.INT(), Types.STRING()]))
        ds.add_sink(StreamingFileSink
                    .for_row_format('/tmp/output', Encoder.simple_string_encoder())
                    .build())
        env.execute("tutorial_job")
    
    
    if __name__ == '__main__':
        tutorial()

    Now that you defined your PyFlink program, you can run it! First, make sure that the output directory doesn’t exist:

    rm -rf /tmp/output
    

    Next, you can run the example you just created on the command line:

    $ python datastream_tutorial.py
    

    The command builds and runs your PyFlink program in a local mini cluster. You can alternatively submit it to a remote cluster using the instructions detailed in Job Submission Examples.

    Finally, you can see the execution result on the command line:

    $ find /tmp/output -type f -exec cat {} ;
    1,aaa
    2,bbb
    

    This walkthrough gives you the foundations to get started writing your own PyFlink DataStream API programs. To learn more about the Python DataStream API, please refer to Flink Python API Docs for more details.

  • 相关阅读:
    【科技】扩展Lucas随想
    【NOI 2018】屠龙勇士(扩欧)
    【NOI 2018】冒泡排序(组合数学)
    【NOI 2018】归程(Kruskal重构树)
    【APIO 2018】铁人两项(圆方树)
    【科技】KD-tree随想
    UOJ#207. 共价大爷游长沙 LCT
    UOJ#23. 【UR #1】跳蚤国王下江南 仙人掌 Tarjan 点双 圆方树 点分治 多项式 FFT
    UOJ#33. 【UR #2】树上GCD 点分治 莫比乌斯反演
    UOJ#191. 【集训队互测2016】Unknown 点分治 分治 整体二分 凸包 计算几何
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/14897920.html
Copyright © 2011-2022 走看看