zoukankan      html  css  js  c++  java
  • Storm HBase 集成

    原文链接:http://storm.apache.org/releases/1.1.0/storm-hbase.html

    Storm/Trident 和 Apache HBase 的集成

    用法

    和HBase集成的重要API是org.apache.storm.hbase.bolt.mapper.HBaseMapper接口

    public interface HBaseMapper extends Serializable {
        byte[] rowKey(Tuple tuple);
    
        ColumnList columns(Tuple tuple);
    }

    rowKey()方法是简单明了的:输入一个Storm tuple,返回代表rowkey的字节数组。

    columns()方法定义了要写入HBase行的内容。ColumnList类允许你添加 standard HBase columns和HBase counter columns(这两种列是什么???)

    要添加一个standard column,使用addColumn()方法:

    ColumnList cols = new ColumnList();
    cols.addColumn(this.columnFamily, field.getBytes(), toBytes(tuple.getValueByField(field)));

    要添加一个counter column,使用addCounter()方法:

    ColumnList cols = new ColumnList();
    cols.addCounter(this.columnFamily, field.getBytes(), toLong(tuple.getValueByField(field)));

    当远程HBase启用了安全认证,一个Kerberos keytab和相应的principle名需要被提供给storm-hbase连接器。

    特别地,传递给Topology的Config对象应该包含{(“storm.keytab.file”, “$keytab”), ("storm.kerberos.principal", “$principal”)}。例如:

    Config config = new Config();
    ...
    config.put("storm.keytab.file", "$keytab");
    config.put("storm.kerberos.principal", "$principle");
    StormSubmitter.submitTopology("$topologyName", config, builder.createTopology());

    使用授权的token与启用安全认证的 HBase 集成

    如果你的Topology要和启用安全认证的HBase交互,你的bolts/states需要被HBase认证。解决办法是需要所有潜在的worker主机拥有"sotrm.keytab.file"。

    如果你在一个集群上有多个Topology,每一个都使用不同的hbase用户,你必须创建多个keytab并把它分发到所有worker主机。

    以上做法的替代方法是:

    管理员可以配置nimbus能自动地代表提交Topology的用户获取授权token。这样,nimbus需要以以下配置启动:

    nimbus.autocredential.plugins.classes : ["org.apache.storm.hbase.security.AutoHBase"] 
    nimbus.credential.renewers.classes : ["org.apache.storm.hbase.security.AutoHBase"]
    hbase.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hbase super user that can impersonate other users.) 
    hbase.kerberos.principal: "superuser@EXAMPLE.com" 
    nimbus.credential.renewers.freq.secs : 518400 (6 days, hbase tokens by default expire every 7 days and can not be renewed, 
    if you have custom settings for hbase.auth.token.max.lifetime in hbase-site.xml than you should ensure this value is atleast 1 hour less then that.)

    你的Topology配置应该包括:topology.auto-credentials :["org.apache.storm.hbase.security.AutoHBase"]

    如果nimbus没有以上配置,你需要添加以上配置然后重启HBase。

    确保hbase配置文件(core-site.xml,hdfs-site.xml 和 hbase-site.xml)和包含storm-hbase jar的依赖文件都在nimbus的classpath里。

    Nimbus会使用在配置里指定的keytab和principal进行HBase认证。在每一个Topology提交时,nimbus都要模拟提交的Topology用户,并且代表提交的Topology用户获取授权token。

    如果Topology以 topology.auto-credentials :["org.apache.storm.hbase.security.AutoHBase"] 这样的配置启动,nimbus会为了你的Topology把授权token推给所有的worker,

    连接hbase的 bolt/state 会以这些token进行认证。

    当nimbus模拟提交的Topology用户的用户时,你需要确保在 storm.kerberos.principal 中指定的user拥有代表其他user获取token的权限。

    要达到这个要求,你需要按照列在这个link中的配置方向进行配置 http://hbase.apache.org/book/security.html#security.rest.gateway

    你可以阅读关于建立安全认证的HBase的内容 http://hbase.apache.org/book/security.html

    SimpleHBaseMapper

    storm-hbase有一个含有通用目的HBaseMapper的实现——SimpleHBaseMapper,它可以把Storm tuple映射到标准HBase column和counter column

    要使用SimpleHBaseMapper,你需要告诉它哪些field要被映射成什么类型的column。

    以下代码创建了一个SimpleHBaseMapper实例:

    1. 使用 word tuple值作为一个 row key

    2. 为tuple域word 添加一个标准的HBase列

    3. 为tuple域count 添加一个HBase counter列

    4. 把值写到 cf 列族中

    SimpleHBaseMapper mapper = new SimpleHBaseMapper() 
            .withRowKeyField("word")
            .withColumnFields(new Fields("word"))
            .withCounterFields(new Fields("count"))
            .withColumnFamily("cf");

    HBaseBolt

    要使用HBaseBolt,需要提供 输出的表的名字 和 一个HBaseMapper的实现 进行构造

    HBaseBolt hbase = new HBaseBolt("WordCount", mapper);

    HBaseBolt会授权 mapper实例 明确如何把tuple数据持久化到HBase中。

    HBaseValueMapper

    这个类运行你把 HBase的查询结果转化成storm Values实例,这个Values实例将会被HBaseLookupBolt发射出去。

    public interface HBaseValueMapper extends Serializable {
        public List<Values> toTuples(Result result) throws Exception;
        void declareOutputFields(OutputFieldsDeclarer declarer);
    }

    toTuples方法接收一个 HBase Result,输出一个包含Values的List。每一个由这个方法返回的值都会被HBaseLookupBolt发射。

    declareOutputFields应该被用来声明HBaseLookupBolt的输出域 outputFields。

    在 src/test/java 路径下有example

    HBaseProjectionCriteria

    这个类允许你指定HBase Get方法的投影条件。

    这是lookupBolt可选的参数,如果你不指定这个实例的话,所有的列都会由HBaseLookupBolt返回。

    public class HBaseProjectionCriteria implements Serializable {
        public HBaseProjectionCriteria addColumnFamily(String columnFamily);
        public HBaseProjectionCriteria addColumn(ColumnMetaData column);
    }

    addColumnFamily 接收列族 columnFamily。指定这个参数意味着这个列族的所有列都将被包含在投影里

    addColumn 接收一个 ColumnMetaData实例。指定这个参数意味着只有这个列族的这个列会成为投影的一部分。

    以下代码会创建一个HBaseProjectionCriteria实例用来指定投影的条件:

    1. 包含列族 cf 的count column

    2. 包含列族 cf2 的所有列

    HBaseProjectionCriteria projectionCriteria = new HBaseProjectionCriteria()
        .addColumn(new HBaseProjectionCriteria.ColumnMetaData("cf", "count"))
        .addColumnFamily("cf2");

    HBaseLookupBolt

    要使用HBaseLookupBolt,通过 要输出到的表名、一个HBaseMapper的实例、一个HBaseRowToStormValueMapper的实例去构建。

    你可以指定或不指定HBaseProjectionCriteria。

    HBaseLookupBolt会使用HBaseMapper的实例mapper获取用来查询的rowKey,

    会使用HBaseProjectionCriteria的实例指定哪些列会被包含在结果中,

    会使用HBaseRowToStormValueMapper的实例获取要被bolt发送的value。

    你可以参照 在 src/test/java下的example:topology LookupWordCount.java

    示例: 持久化 Word Count

    在src/test/java下有一个可以运行的例子

    Setup

    以下步骤假定你正在本地运行HBase,或在classpath中有hbase-site.xml指出你的HBase集群

    使用 hbase shell命令创建一个schema:

    > create 'WordCount', 'cf'

    执行

    运行org.apache.storm.hbase.topology.PersistenWordCount类(它会运行这个Topology 10秒,然后退出)

    在这个Topology运行过程中或运行之后,运行org.apache.storm.hbase.topology.WordCountClient类去查看存储在HBase的计数值。你会看到以下类似的内容:

    Word: 'apple', Count: 6867
    Word: 'orange', Count: 6645
    Word: 'pineapple', Count: 6954
    Word: 'banana', Count: 6787
    Word: 'watermelon', Count: 6806

    为了引用,列出这个Topology示例

    public class PersistentWordCount {
        private static final String WORD_SPOUT = "WORD_SPOUT";
        private static final String COUNT_BOLT = "COUNT_BOLT";
        private static final String HBASE_BOLT = "HBASE_BOLT";
    
    
        public static void main(String[] args) throws Exception {
            Config config = new Config();
    
            WordSpout spout = new WordSpout();
            WordCounter bolt = new WordCounter();
    
            SimpleHBaseMapper mapper = new SimpleHBaseMapper()
                    .withRowKeyField("word")
                    .withColumnFields(new Fields("word"))
                    .withCounterFields(new Fields("count"))
                    .withColumnFamily("cf");
    
            HBaseBolt hbase = new HBaseBolt("WordCount", mapper);
    
    
            // wordSpout ==> countBolt ==> HBaseBolt
            TopologyBuilder builder = new TopologyBuilder();
    
            builder.setSpout(WORD_SPOUT, spout, 1);
            builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
            builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
    
    
            if (args.length == 0) {
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("test", config, builder.createTopology());
                Thread.sleep(10000);
                cluster.killTopology("test");
                cluster.shutdown();
                System.exit(0);
            } else {
                config.setNumWorkers(3);
                StormSubmitter.submitTopology(args[0], config, builder.createTopology());
            }
        }
    }
  • 相关阅读:
    ssh免密登录
    jdk安装
    jq选择器
    使用<button></button>标签
    mysql连接字符串
    如何把maven项目转成web项目
    pl/sql连接远程oracle
    Oracle 存储过程
    SQL Server存储过程
    MySQL存储过程
  • 原文地址:https://www.cnblogs.com/sunspeedzy/p/7474029.html
Copyright © 2011-2022 走看看