zoukankan      html  css  js  c++  java
  • Flink如何做维表关联?

    使用

    RichAsyncFunction 加 CacheBuilder

    
    CacheBuilder.newBuilder()
            //最多存储10000条
            .maximumSize(10000)
            //过期时间为1分钟
            .expireAfterWrite(60, TimeUnit.SECONDS)
            .build();
    
    public class LRU extends RichAsyncFunction<String,Order> {
        private static final Logger LOGGER = LoggerFactory.getLogger(LRU.class);
        String table = "info";
        Cache<String, String> cache = null;
        private HBaseClient client = null;
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            //创建hbase客户端
            client = new HBaseClient("127.0.0.1","7071");
            cache = CacheBuilder.newBuilder()
                    //最多存储10000条
                    .maximumSize(10000)
                    //过期时间为1分钟
                    .expireAfterWrite(60, TimeUnit.SECONDS)
                    .build();
        }
        @Override
        public void asyncInvoke(String input, ResultFuture<Order> resultFuture) throws Exception {
            JSONObject jsonObject = JSONObject.parseObject(input);
            Integer cityId = jsonObject.getInteger("city_id");
            String userName = jsonObject.getString("user_name");
            String items = jsonObject.getString("items");
            //读缓存
            String cacheCityName = cache.getIfPresent(cityId);
            //如果缓存获取失败再从hbase获取维度数据
            if(cacheCityName != null){
                Order order = new Order();
                order.setCityId(cityId);
                order.setItems(items);
                order.setUserName(userName);
                order.setCityName(cacheCityName);
                resultFuture.complete(Collections.singleton(order));
            }else {
                client.get(new GetRequest(table,String.valueOf(cityId))).addCallback((Callback<String, ArrayList<KeyValue>>) arg -> {
                    for (KeyValue kv : arg) {
                        String value = new String(kv.value());
                        Order order = new Order();
                        order.setCityId(cityId);
                        order.setItems(items);
                        order.setUserName(userName);
                        order.setCityName(value);
                        resultFuture.complete(Collections.singleton(order));
                        cache.put(String.valueOf(cityId), value);
                    }
                    return null;
                });
            }
        }
    }
    
    
  • 相关阅读:
    2018——测试与信仰
    面试必备----测试用例笔试题分享
    软件测试人员必备网络知识(一):什么是cookie?
    Postman和Selenium IDE开局自带红蓝BUFF属性,就问你要还是不要
    【Loadrunner】LR参数化:利用mysql数据库里面的数据进行参数化
    因果图法设计测试用例
    场景法设计测试用例
    Linux Centos7下安装Python
    Vmware安装与VMware下Linux系统安装
    Python运算符与表达式
  • 原文地址:https://www.cnblogs.com/weijiqian/p/14034479.html
Copyright © 2011-2022 走看看