zoukankan      html  css  js  c++  java
  • Flink 异步IO实践

    一、Aysnc I/O 是啥?

    流计算系统中经常需要与外部系统进行交互,比如需要查询外部数据库以关联上用户的额外信息。Flink Async I/O API 允许用户在数据流中使用异步请求客户端访问外部存储。该API处理与数据流的集成,以及消息顺序性(Order)、事件时间(event time)、一致性(容错)等脏活累活。用户只需要专注于业务。

    二、业务实现:访问外部系统实时获取用户所在位置信息(省市信息、区号等)

    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.async.ResultFuture;
    import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
    import org.apache.http.HttpResponse;
    import org.apache.http.client.config.RequestConfig;
    import org.apache.http.client.methods.HttpGet;
    import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
    import org.apache.http.impl.nio.client.HttpAsyncClients;
    import org.apache.http.util.EntityUtils;
    import java.util.Collections;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.Future;
    import java.util.function.Supplier;
    
    public class AsyncFunction extends RichAsyncFunction<String, ActivityBean> {
    
        // transient 不参与序列化 不持久化缓存状态
        private transient CloseableHttpAsyncClient httpClient = null;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            // 初始化异步httpClient
            RequestConfig requestConfig = RequestConfig.custom()
                    .setSocketTimeout(3000)  // socket超时时间
                    .setConnectTimeout(3000) // 链接超时时间
                    .build();
    
            httpClient = HttpAsyncClients.custom()
                    .setMaxConnTotal(20) // 最多可创建的httpClient请求
                    .setDefaultRequestConfig(requestConfig)
                    .build();
    
            httpClient.start();
        }
    
        @Override
        public void asyncInvoke(String input, ResultFuture<ActivityBean> resultFuture) throws Exception {
            String[] fields = input.split(",");
            String uid = fields[0];
            String aid = fields[1];
            String time = fields[2];
            int eventType = Integer.parseInt(fields[3]);
            String longitude = fields[4];
            String latitude = fields[5];
    
            String url = "mytest";
            HttpGet httpGet = new HttpGet(url);
            Future<HttpResponse> future = httpClient.execute(httpGet, null);
    
            CompletableFuture.supplyAsync(new Supplier<String>() {
                @Override
                public String get() {
                    try {
                        HttpResponse httpResponse = future.get();
                        String province = null;
                        if (httpResponse.getStatusLine().getStatusCode() == 200) {
                            // 获取请求的json字符串
                            String result = EntityUtils.toString(httpResponse.getEntity());
                            // 转成json对象
                            JSONObject jsonObject = JSON.parseObject(result);
                            // 获取位置信息
                            JSONObject regeocode = jsonObject.getJSONObject("regeocode");
                            if (regeocode != null && !regeocode.isEmpty()) {
                                JSONObject address = regeocode.getJSONObject("addressComponent");
                                // 获取省市区
                                province = address.getString("province");
                            }
                        }
    
                        return  province;
    
                    } catch (Exception e) {
                        return null;
                    }
                }
            }).thenAccept((String province) -> {
                resultFuture.complete(Collections.singleton(ActivityBean.of(uid, aid, null, time, eventType, province)));
            });
    
    
        }
    
        @Override
        public void close() throws Exception {
            httpClient.close();
        }
    }
    

    三、最佳实践

    Async I/O operator提供完全exactly-once容错保证,它将运行中的异步请求记录存储在检查点中,并在从故障恢复时恢复/重新触发请求

    最佳实践

    1.使用Executor作为Future的回调时,推荐使用线程切换开销较小的DirectExecutor,可以选择下面任意方式或得:

    org.apache.flink.runtime.concurrent.Executors.directExecutor()

    com.google.common.util.concurrent.MoreExecutors.directExecutor()

    2.asyncInvoke#asyncInvoke不是被Flink多线程调用的,不要在里面直接使用阻塞操作。

  • 相关阅读:
    国家电网 ERP系统
    小亚通SaaS ERP & 小鹅通 Education
    CRM User(Customer) Employee
    Docker ERP CRM
    OA、CRM、ERP之间的区别和联系是什么?
    基于jfinal开发的12306抢票软件
    15款优质Spring开源项目 脚手架
    Echart 跨域 Data Demo
    区块链+金融,带你直击实践应用中的需求和痛点
    thinkphp 5.0报错pcntl_signal() has been disabled for security reasons问题解决
  • 原文地址:https://www.cnblogs.com/zfwwdz/p/13099465.html
Copyright © 2011-2022 走看看