zoukankan      html  css  js  c++  java
  • Flink异步Async I/O

    Async I/O的原理和基本用法

    • 简单的来说,使用 Async I/O 对应到 Flink 的 API 就是 RichAsyncFunction 这个抽象类实现里面的3个方法
      1. open(初始化)
      2. asyncInvoke(数据异步调用)
      3. close(停止的一些操作)

      使用Async I/O的模板方法:

    /**
     * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
     */
    classAsyncDatabaseRequestextendsRichAsyncFunction<String, Tuple2<String, String>> {
    
        /** The database specific client that can issue concurrent requests with callbacks */
        private transient DatabaseClient client;
    
        @Override
        publicvoidopen(Configuration parameters) throws Exception {
            client = new DatabaseClient(host, post, credentials);
        }
    
        @Override
        publicvoidclose() throws Exception {
            client.close();
        }
    
        @Override
        publicvoidasyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
    
            // issue the asynchronous request, receive a future for result
            final Future<String> result = client.query(key);
    
            // set the callback to be executed once the request by the client is complete
            // the callback simply forwards the result to the result future
            CompletableFuture.supplyAsync(new Supplier<String>() {
    
                @Override
                public String get() {
                    try {
                        return result.get();
                    } catch (InterruptedException | ExecutionException e) {
                        // Normally handled explicitly.
                        return null;
                    }
                }
            }).thenAccept( (String dbResult) -> {
                resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
            });
        }
    }
    
    // create the original stream
    DataStream<String> stream = ...;
    
    // apply the async I/O transformation
    DataStream<Tuple2<String, String>> resultStream =
        AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

    假设一个场景是需要进行异步请求其他数据库,那么要实现一个通过异步I/O来操作数据库还需要三个步骤:

      1、实现用来分发请求的AsyncFunction

      2、获取操作结果的callback,并将它提交到AsyncCollector中

      3、将异步I/O操作转换成DataStream 

    其中的两个重要的参数:
    Timeouttimeout 定义了异步操作过了多长时间后会被丢弃,这个参数是防止了死的或者失败的请求 
    Capacity 这个参数定义了可以同时处理多少个异步请求。虽然异步I/O方法会带来更好的吞吐量,但是算子仍然会成为流应用的瓶颈。超过限制的并发请求数量会产生背压
       还有几个需要注意的点:
    • 使用Async I/O,需要外部存储有支持异步请求的客户端。

    • 使用Async I/O,继承RichAsyncFunction(接口AsyncFunction<in, out="">的抽象类),重写或实现open(建立连接)、close(关闭连接)、asyncInvoke(异步调用)3个方法即可。

    • 使用Async I/O, 最好结合缓存一起使用,可减少请求外部存储的次数,提高效率。

    • Async I/O 提供了Timeout参数来控制请求最长等待时间。默认,异步I/O请求超时时,会引发异常并重启或停止作业。如果要处理超时,可以重写AsyncFunction#timeout方法。

    • Async I/O 提供了Capacity参数控制请求并发数,一旦Capacity被耗尽,会触发反压机制来抑制上游数据的摄入。

    • Async I/O 输出提供乱序和顺序两种模式。

      • 乱序
        • 用AsyncDataStream.unorderedWait(...) API,每个并行的输出顺序和输入顺序可能不一致。
      • 顺序
        • 用AsyncDataStream.orderedWait(...) API,每个并行的输出顺序和输入顺序一致。为保证顺序,需要在输出的Buffer中排序,该方式效率会低一些
    • 使用Async十分需要注意的几个点:

        外部数据源必须是异步客户端:  
           如果是线程安全的,你可以不加 transient 关键字,初始化一次。
           否则,你需要加上 transient,不对其进行初始化,而在 open 方法中,为每个 Task 实例初始化一个。
        当异步访问完成时,方法中多了一个 CompletableFuture,,需要调用其方法进行处理。比如

    redisFuture.thenAccept(new Consumer<String>() {
                @Override
                public void accept(String value) {
                    future.complete(Collections.singletonList(Row.of(key, value)));
                }
            });
    • 例子如下:
    public classAsyncIOFunctionTest{
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.setParallelism(1);
    
            Properties p = new Properties();
            p.setProperty("bootstrap.servers", "localhost:9092");
    
            DataStreamSource<String> ds = env.addSource(new FlinkKafkaConsumer010<String>("order", new SimpleStringSchema(), p));
            ds.print();
    
            SingleOutputStreamOperator<Order> order = ds
                    .map(new MapFunction<String, Order>() {
                        @Override
                        public Order map(String value) throws Exception {
                            return new Gson().fromJson(value, Order.class);
                        }
                    })
                    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Order>() {
                        @Override
                        public long extractAscendingTimestamp(Order element) {
                            try {
                                return element.getOrderTime();
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                            return 0;
                        }
                    })
                    .keyBy(new KeySelector<Order, String>() {
                        @Override
                        public String getKey(Order value) throws Exception {
                            return value.getUserId();
                        }
                    })
                    .window(TumblingEventTimeWindows.of(Time.minutes(10)))
                    .maxBy("orderTime");
    
            SingleOutputStreamOperator<Tuple7<String, String, Integer, String, String, String, Long>> operator = AsyncDataStream
                    .unorderedWait(order, new RichAsyncFunction<Order, Tuple7<String, String, Integer, String, String, String, Long>>() {
    
                        private Connection connection;
    
                        @Override
                        public void open(Configuration parameters) throws Exception {
                            super.open(parameters);
                            Class.forName("com.mysql.jdbc.Driver");
                            connection = DriverManager.getConnection("url", "user", "pwd");
                            connection.setAutoCommit(false);
                        }
    
                        @Override
                        public void asyncInvoke(Order input, ResultFuture<Tuple7<String, String, Integer, String, String, String, Long>> resultFuture) throws Exception {
                            List<Tuple7<String, String, Integer, String, String, String, Long>> list = new ArrayList<>();
                            // 在 asyncInvoke 方法中异步查询数据库
                            String userId = input.getUserId();
                            Statement statement = connection.createStatement();
                            ResultSet resultSet = statement.executeQuery("select name,age,sex from user where userid=" + userId);
                            if (resultSet != null && resultSet.next()) {
                                String name = resultSet.getString("name");
                                int age = resultSet.getInt("age");
                                String sex = resultSet.getString("sex");
                                Tuple7<String, String, Integer, String, String, String, Long> res = Tuple7.of(userId, name, age, sex, input.getOrderId(), input.getPrice(), input.getOrderTime());
                                list.add(res);
                            }
    
                            // 将数据搜集
                            resultFuture.complete(list);
                        }
    
                        @Override
                        public void close() throws Exception {
                            super.close();
                            if (connection != null) {
                                connection.close();
                            }
                        }
                    }, 5000, TimeUnit.MILLISECONDS,100);
    
            operator.print();
    
    
            env.execute("AsyncIOFunctionTest");
        }
    }
    • 例子中在的异步请求
      • open()中创建连接对象  
      • 在RichAsyncFunction的asyncInvoke()方法中,直接查询数据库操作,并将数据返回出去。  
      • 在close()方法中关闭连接  

      

  • 相关阅读:
    [转]Design Time Serialization
    啥都不说了,不枉熬油点灯了
    Eclips汉化
    FreeBSD 上用上苹果黑体,效果很棒
    使用Portupgrade
    fvwm2rc
    make.conf 配置优化
    基于FreeBSD5.4全能服务器安装(dns,ftp,apache,qmail)
    portssupfile
    关于FreeBSD 5优化的补充
  • 原文地址:https://www.cnblogs.com/YuanWeiBlogger/p/14674892.html
Copyright © 2011-2022 走看看