zoukankan      html  css  js  c++  java
  • Flink如何做维表关联?

    使用

    RichAsyncFunction 加 CacheBuilder

    
    CacheBuilder.newBuilder()
            //最多存储10000条
            .maximumSize(10000)
            //过期时间为1分钟
            .expireAfterWrite(60, TimeUnit.SECONDS)
            .build();
    
    public class LRU extends RichAsyncFunction<String,Order> {
        private static final Logger LOGGER = LoggerFactory.getLogger(LRU.class);
        String table = "info";
        Cache<String, String> cache = null;
        private HBaseClient client = null;
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            //创建hbase客户端
            client = new HBaseClient("127.0.0.1","7071");
            cache = CacheBuilder.newBuilder()
                    //最多存储10000条
                    .maximumSize(10000)
                    //过期时间为1分钟
                    .expireAfterWrite(60, TimeUnit.SECONDS)
                    .build();
        }
        @Override
        public void asyncInvoke(String input, ResultFuture<Order> resultFuture) throws Exception {
            JSONObject jsonObject = JSONObject.parseObject(input);
            Integer cityId = jsonObject.getInteger("city_id");
            String userName = jsonObject.getString("user_name");
            String items = jsonObject.getString("items");
            //读缓存
            String cacheCityName = cache.getIfPresent(cityId);
            //如果缓存获取失败再从hbase获取维度数据
            if(cacheCityName != null){
                Order order = new Order();
                order.setCityId(cityId);
                order.setItems(items);
                order.setUserName(userName);
                order.setCityName(cacheCityName);
                resultFuture.complete(Collections.singleton(order));
            }else {
                client.get(new GetRequest(table,String.valueOf(cityId))).addCallback((Callback<String, ArrayList<KeyValue>>) arg -> {
                    for (KeyValue kv : arg) {
                        String value = new String(kv.value());
                        Order order = new Order();
                        order.setCityId(cityId);
                        order.setItems(items);
                        order.setUserName(userName);
                        order.setCityName(value);
                        resultFuture.complete(Collections.singleton(order));
                        cache.put(String.valueOf(cityId), value);
                    }
                    return null;
                });
            }
        }
    }
    
    
  • 相关阅读:
    编辑文章
    POJ_1195 Mobile phones 【二维树状数组】
    WCF探索之旅(三)——IIS公布WCF服务
    doT.js具体使用介绍
    数据结构:最小生成树--Kruskal算法
    关于打开sdk下载不了的最优秀解决方式
    JS 之 数据类型转换
    MongoDB学习笔记&lt;六&gt;
    Spring、Hibernate 数据不能插入到数据库问题解决
    Cocos2d-x 3.0final 终结者系列教程16-《微信飞机大战》实现
  • 原文地址:https://www.cnblogs.com/weijiqian/p/14034479.html
Copyright © 2011-2022 走看看