zoukankan      html  css  js  c++  java
  • Mapreduce atop Apache Phoenix (ScanPlan 初探)

    利用Mapreduce/hive查询Phoenix数据时如何划分partition?
    PhoenixInputFormat的源码一看便知:

        public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
            Configuration configuration = context.getConfiguration();
            QueryPlan queryPlan = this.getQueryPlan(context, configuration);
            List allSplits = queryPlan.getSplits();
            List splits = this.generateSplits(queryPlan, allSplits);
            return splits;
        }
    

    根据select查询语句创建查询计划,QueryPlan,实际是子类ScanPlan。getQueryPlan函数有一个特殊操作:
    queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
    如果HBase表有多个Region,则会将一个Scan划分为多个,每个Region对应一个Split。这个逻辑跟MR on HBase类似。只是这边的实现过程不同,这边调用的是Phoenix的QueryPlan,而不是HBase API。

    以下是一个示例,加深这一过程的理解。

    Phoenix 建表

    将表presplit为4个region:[-∞,CS), [CS, EU), [EU, NA), [NA, +∞)

    CREATE TABLE TEST (HOST VARCHAR NOT NULL PRIMARY KEY, DESCRIPTION VARCHAR) SPLIT ON ('CS','EU','NA');
    upsert into test(host, description) values ('CS11', 'cccccccc');
    upsert into test(host, description) values ('EU11', 'eeeddddddddd')
    upsert into test(host, description) values ('NA11', 'nnnnneeeddddddddd');
    
    0: jdbc:phoenix:localhost> select * from test;
    +-------+--------------------+
    | HOST  |    DESCRIPTION     |
    +-------+--------------------+
    | CS11  | cccccccc           |
    | EU11  | eeeddddddddd       |
    | NA11  | nnnnneeeddddddddd  |
    +-------+--------------------+
    
    

    窥探ScanPlan

    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.log4j.BasicConfigurator;
    import org.apache.phoenix.compile.QueryPlan;
    import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
    import org.apache.phoenix.jdbc.PhoenixStatement;
    
    import java.io.IOException;
    import java.sql.*;
    import java.util.List;
    
    
    public class LocalPhoenix {
        public static void main(String[] args) throws SQLException, IOException {
            BasicConfigurator.configure();
    
            Statement stmt = null;
            ResultSet rs = null;
    
            Connection con = DriverManager.getConnection("jdbc:phoenix:localhost:2181:/hbase");
            stmt = con.createStatement();
            PhoenixStatement pstmt = (PhoenixStatement)stmt;
            QueryPlan queryPlan = pstmt.optimizeQuery("select * from TEST");
            queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
    
            Scan scan = queryPlan.getContext().getScan();
            List<List<Scan>> scans = queryPlan.getScans();
    
            for (List<Scan> sl : scans) {
                System.out.println();
                for (Scan s : sl) {
                    System.out.print(s);
                }
            }
    
            con.close();
    
        }
    }
    
    

    4个scan如下:

    {"loadColumnFamiliesOnDemand":null,"startRow":"","stopRow":"CS","batch":-1,"cacheBlocks":true,"totalColumns":1,"maxResultSize":-1,"families":{"0":["ALL"]},"caching":100,"maxVersions":1,"timeRange":[0,1523338217847]}
    {"loadColumnFamiliesOnDemand":null,"startRow":"CS","stopRow":"EU","batch":-1,"cacheBlocks":true,"totalColumns":1,"maxResultSize":-1,"families":{"0":["ALL"]},"caching":100,"maxVersions":1,"timeRange":[0,1523338217847]}
    {"loadColumnFamiliesOnDemand":null,"startRow":"EU","stopRow":"NA","batch":-1,"cacheBlocks":true,"totalColumns":1,"maxResultSize":-1,"families":{"0":["ALL"]},"caching":100,"maxVersions":1,"timeRange":[0,1523338217847]}
    {"loadColumnFamiliesOnDemand":null,"startRow":"NA","stopRow":"","batch":-1,"cacheBlocks":true,"totalColumns":1,"maxResultSize":-1,"families":{"0":["ALL"]},"caching":100,"maxVersions":1,"timeRange":[0,1523338217847]}Disconnected from the target VM, address: '127.0.0.1:63406', transport: 'socket'
    
    
  • 相关阅读:
    Oracle RAC asm常用命令
    Oracle10g RAC关闭及启动步骤
    工具系列 | git checkout 可替换命令 git switch 和 git restore
    工具系列 | 博客签名
    工具系列 | Token认证方式之JWT【转载】
    PHP系列 | ThinkPHP5.1 如何自动加载第三方SDK(非composer包 )
    工具系列 | 虚拟化VMware ESXi 6.7服务器安装配置
    安全系列 | 【阿里云】安全告警处理-进程异常行为-访问恶意下载源
    其他系列 | Github 贡献统计异常的处理
    MySQL系列 | 安装 MySQL 5.7 on Ubuntu 16.04 | 18.04
  • 原文地址:https://www.cnblogs.com/luweiseu/p/8783253.html
Copyright © 2011-2022 走看看