zoukankan      html  css  js  c++  java
  • Java访问Hbase

    1.kerberos验证

    a.下载对应版本JCE(Java Cryptography Extension),解压拷贝local_policy.jar/US_export_policy.jar到$JAVA_HOME//jre/lib/security

    b.在resource即classes路径下添加hbase-site.xml

    <configuration>
        <!-- 访问的hbase集群的名字 -->
        <property>
            <name>hbase.cluster.name</name>
            <value>${hbase.cluster.name}</value>
        </property>
    
        <!-- rpc调用的超时 单位ms-->
        <property>
            <name>hbase.rpc.timeout</name>
            <value>200</value>
        </property>
    
        <!-- API的超时  -->
        <property>
            <name>hbase.client.operation.timeout</name>
            <value>200</value>
            <discription>in ms</discription>
        </property>
    
        <!-- 失败重试次数 -->
        <property>
            <name>hbase.client.retries.number</name>
            <value>2</value>
        </property>
    
        <!-- 客户端并发调用HBaseClient API的线程数 -->
        <property>
            <name>hbase.client.tablepool.maxsize</name>
            <value>30</value>
        </property>
    </configuration>

    c.加JVM启动参数设置验证参数

    -Dhadoop.property.hadoop.security.authentication=kerberos
    -Djava.security.krb5.conf=${conf_path}/krb5-hadoop.conf
    -Dhadoop.property.hadoop.client.keytab.file=${conf_path}/${kerberos_principal}.keytab
    -Dhadoop.property.hadoop.client.kerberos.principal=${kerberos_principal}@XIAOMI.HADOOP
    

    具体的读写代码就不列了,网上例子比较多。

    2.MapReduce批量写数据到Hbase

    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    import java.io.IOException;
    
    public class WriteToHbase {
        private static final String INPUT = "";
        //指定hbase集群地址及表名
        private static final String TABLE = "hbase://hytst-staging/namespace:tablename";
    
        //读源文件
        public static class SourceMapper
                extends Mapper<LongWritable, Text, Text, Text>{
            //something
        }
    
        //写入hbase
        public static class WriteReducer
                extends TableReducer<Text,IntWritable,ImmutableBytesWritable> {
            private byte[] family = "W".getBytes();//列簇
            private byte[] qualifier = "i".getBytes();//子列
            private int rowDone;
            private long startTime;
            @Override
            protected void setup(Context context) throws IOException, InterruptedException {
                rowDone = 0;
                startTime = System.currentTimeMillis();
                super.setup(context);
            }
    
            public void reduce(Text key, Iterable<Text> values, Context context)  {
                byte[] rowkey = key.getBytes();
                Put put = new Put(rowkey);
                put.add(family, qualifier, Bytes.toBytes(StringUtils.join(values.iterator(), ",")));
                context.write(new ImmutableBytesWritable(rowkey), put);
                //或者如下
                /*HTable table = new HTable(context.getConfiguration(), TABLE);
                table.put(put);
                table.close();*/
                ++rowDone;
                //限制写QPS, 800/s
                TableMapReduceUtil.limitScanRate(800, rowDone, System.currentTimeMillis() - startTime);
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = HBaseConfiguration.create();
            Job job = Job.getInstance(conf, "HdfsToHbase");
            job.setJarByClass(WriteToHbase.class);
            // Turn off speculative to avoid write to hbase more than once
            job.setSpeculativeExecution(false);
    
            job.setMapperClass(SourceMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            FileInputFormat.addInputPath(job, new Path(INPUT));
         //初始化TableReduceJob
            TableMapReduceUtil.initTableReducerJob(TABLE, WriteReducer.class, job);
            job.setNumReduceTasks(2);
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
    

      

  • 相关阅读:
    ZJOI2019 Day1 游记
    Codeforces Round #545 (Div. 1)
    AtCoder WTF 2019 C2. Triangular Lamps Hard
    析合树
    Codeforces Round #539 (Div. 1)
    Codeforces 1103 E. Radix sum
    Codeforces 1097 G. Vladislav and a Great Legend
    sts创建spring项目
    servlet项目demo
    IDEA中创建maven web项目
  • 原文地址:https://www.cnblogs.com/whuqin/p/5535413.html
Copyright © 2011-2022 走看看