zoukankan      html  css  js  c++  java
  • hbase删除某个列的数据

      碰到这样一个事:我们往hbase里面导数据, 补了快一年的数据了,结果发现某个列的数据有几个月是有问题的,不能用,所以需要将这个列的有问题的几个月数据全部干掉, 查了hbase的命令,发现没有这种根据rowkey范围直接删除某个列的命令. 所以只能自己写了: 可以采用客户端编程的方式,也可以采用hbase on mr的方式,我这里采用的是hbase on mr的方式。原因是如果采用客户端编程的方式,需要scan所有的主键,然后判断rowkey是否符合删除的要求,如果符合则删除,因为数据量很大,这种方式可能太慢,其次是怕把客户端直接给弄死了。采用mr分布式的做法就不用担心这方面的问题。 

        注:

        1. hbase的版本是: HBase 0.98.9

        2. rowkey的形式是  userid+yyyyMMdd的形式, 比如: 0000120181103, 这里需要把20180406之前的数据的某个列( f:cl )干掉,代码如下:

      

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Delete;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    import java.io.IOException;
    
    public class HbaseDelColMr {
    
        static class DelColMapper extends TableMapper<Text, NullWritable> {
    
            private Text dekKey = new Text();
    
            // key: rowkey
            // result: 一行的数据
            @Override
            public void map(ImmutableBytesWritable key, Result result, Context context) throws IOException, InterruptedException {
    
                //拿到 rowkey
                String rowkey = Bytes.toString(key.get());
    
                // 判断 rowkey 是否需要删除 rowkey的类型类似这种字符串 12556565620180405
                String dateStr = rowkey.substring(rowkey.length() - 8, rowkey.length());
    
                //如果在20180406之前的数据全部需要删掉
                if (Integer.parseInt(dateStr) < 20180406) {
                    dekKey.set(rowkey);
                    context.write(dekKey, NullWritable.get());
                }
            }
        }
    
        static class DelColReducer extends TableReducer<Text, NullWritable, ImmutableBytesWritable> {
            @Override
            public void reduce(Text delKey, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
                // delKey 这就是要删除的rowkey
                Delete delete = new Delete(Bytes.toBytes(delKey.toString()));
    
                //设置要删除的列
                delete.deleteColumn(Bytes.toBytes("f"), Bytes.toBytes("cl"));
                context.write(new ImmutableBytesWritable(Bytes.toBytes(delKey.toString())), delete);
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration configuration = HBaseConfiguration.create();
            configuration.set("hbase.zookeeper.quorum", "zk_1,zk_2,zk_3,zk_4,zk_5");
            configuration.set("hbase.zookeeper.property.clientPort", "2181");
            //configuration.set("hbase.local.dir", "/tmp/hbase-local-dir_test");
    
            String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
    
            for (String ar:otherArgs) {
                System.out.println(ar+" ======================================");
            }
    
            Job job = Job.getInstance(configuration);
            job.setJobName("HbaseDelColMr");
            job.setJarByClass(HbaseDelColMr.class);
    
            Scan scan = new Scan();
            scan.addColumn(Bytes.toBytes("f"), Bytes.toBytes("cl"));
            scan.setCaching(500);
            scan.setCacheBlocks(false);
    
            TableMapReduceUtil.initTableMapperJob(
                    otherArgs[0], //输入表 "dt_list_detail_test"
                    scan, // scan 对象
                    DelColMapper.class,
                    Text.class, //mapper输出的key类型
                    NullWritable.class, //mapper输出的value类型
                    job
            );
    
            TableMapReduceUtil.initTableReducerJob(
                    otherArgs[0],// 输出表 "dt_list_detail_test"
                    DelColReducer.class,
                    job);
    
            job.setNumReduceTasks(10);
    
            boolean b = job.waitForCompletion(true);
    
            if (!b) {
                throw new IOException("任务出错.....");
            }
        }
    }
    

      还有一种效率更高更加简便的方式, 就是去掉reduce阶段, 如下:

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Delete;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    import java.io.IOException;
    
    public class HbaseDelColMr2 {
    
        static class DelColMapper extends TableMapper<ImmutableBytesWritable, Delete> {
            @Override
            public void map(ImmutableBytesWritable key, Result result, Context context) throws IOException, InterruptedException {
    
                String rowkey = Bytes.toString(key.get()); //拿到 rowkey
    
                // 判断 rowkey 是否需要删除 rowkey的类型类似这种字符串 12556565620180405
                String dateStr = rowkey.substring(rowkey.length() - 8, rowkey.length());
    
                //如果在20180406之前的数据全部需要删掉
                if (Integer.parseInt(dateStr) < 20180406) {
                    //设置要删除的列
                    Delete delete = new Delete(Bytes.toBytes(rowkey));
                    delete.deleteColumn(Bytes.toBytes("f"), Bytes.toBytes("cl"));
                    context.write(key, delete);  //需要测试如果没有reduce阶段,这里是否会直接写入到hbase, 补充:结论是可以的
                }
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration configuration = HBaseConfiguration.create();
            configuration.set("hbase.zookeeper.quorum", "zk_1,zk_2,zk_3,zk_4,zk_5");
            configuration.set("hbase.zookeeper.property.clientPort", "2181");
            //configuration.set("hbase.local.dir", "/tmp/hbase-local-dir_test");
    
            String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
    
            for (String ar:otherArgs) {
                System.out.println(ar+" ======================================");
            }
    
            Job job = Job.getInstance(configuration);
            job.setJobName("HbaseDelColMr2");
            job.setJarByClass(HbaseDelColMr2.class);
    
            Scan scan = new Scan();
            scan.addColumn(Bytes.toBytes("f"), Bytes.toBytes("cl"));
            scan.setCaching(500);
            scan.setCacheBlocks(false);
    
            TableMapReduceUtil.initTableMapperJob(
                    otherArgs[0], //输入表 "dt_list_detail_test"
                    scan, // scan 对象
                    DelColMapper.class,
                    null, //没有输出,直接写入hbase
                    null, //没有输出,直接写入hbase
                    job
            );
    
            TableMapReduceUtil.initTableReducerJob(
                    otherArgs[0],// 输出表 "dt_list_detail_test"
                    null,
                    job);
    
            job.setNumReduceTasks(0);
    
            boolean b = job.waitForCompletion(true);
    
            if (!b) {
                throw new IOException("任务出错.....");
            }
        }
    }
    

      打包调用: 

     export HADOOP_CLASSPATH=`hbase classpath`

    yarn jar ./hbaseDeltest.jar xxx.HbaseDelColMr -D mapreduce.job.queuename=xxx dt_list_detail_test

     
    这样子就可以啦,上面两种方式随便选一种就ok了。。。。。。
  • 相关阅读:
    C#操作json
    sql server 2008 身份验证失败 18456
    MD5密码加密
    oracle dg 报错提示 涉及硬盘错误
    Rhel6.5 相关操作
    Centos6.9部署vnc
    Sqluldr2 libclntsh.so报错处理
    时钟服务器同步方法
    windows copy 和xcopy
    Linux 本地repo配置
  • 原文地址:https://www.cnblogs.com/wuxilc/p/9904572.html
Copyright © 2011-2022 走看看