zoukankan      html  css  js  c++  java
  • Mapreduce读取Hbase表,写数据到一个Hbase表中

    public class LabelJob
    {
        
        public static void main(String[] args)
            throws Exception
        {
            Job job = Job.getInstance(new Configuration());
            job.setJarByClass(LabelJob.class);
            job.setJobName("Hbase.LabelJob");
            
            Configuration conf = job.getConfiguration();
            conf.set("tablename", "product_tags");
            
            Scan scan = new Scan();
            scan.setCaching(500);
            scan.setCacheBlocks(false);
            //输入表
            TableMapReduceUtil.initTableMapperJob("tb_user", scan, LabelMapper.class, Text.class, Text.class, job);
            
            job.setReducerClass(LabelReducer.class);
            //输出表
            TableMapReduceUtil.initTableReducerJob("usertags", LabelReducer.class, job);
            job.waitForCompletion(true);
            
        }
        
    }
    

      

    public class LabelMapper extends TableMapper<Text, Text>
    {
          protected void setup(Context context)
            throws IOException, InterruptedException
        {
            super.setup(context);
            String tablename = context.getConfiguration().get("tablename");
             .................
        } 
     protected void map(ImmutableBytesWritable rowKey, Result result, Context context)
            throws IOException, InterruptedException
        {
              String userid = Bytes.toString(rowKey.get()); // 读取HBase用户表rowkey
            
              String strlabel = fhb.getStringValue(result, "labels", "label");
             String[] userLabels = strlabel.split(",");
    ....................
    }
    }
    

      

    public class LabelReducer extends TableReducer<Text, Text, ImmutableBytesWritable>
    {
         @Override
        public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException
        {
    String rowKey = key.toString();// 读取Map输出
     for (Text v : values)
                {
                    String tag = v.toString();
                    Long count = tagMap.get(tag);
                    tagMap.put(tag, (count == null) ? 1 : (count + 1));// 计数
                }
     Put put = new Put(productId.getBytes());
      put.add("prodtags".getBytes(), "prodtags".getBytes(),outputlabel.toString().getBytes());
    
    context.write(new ImmutableBytesWritable(productId.getBytes()), put);
    
    }
    
    }
    

      

  • 相关阅读:
    浅析堆与垃圾回收
    再探JVM内存模型
    索引使用的基本原则
    常见的索引模型浅析
    初识InnoDB体系架构和逻辑存储结构
    一条update SQL语句是如何执行的
    MySQL一条查询语句是如何执行的
    堆与优先队列
    ibatis BindingException Parameter 'status' not found. Available parameters are [arg1, arg0, param1, param2] 解决方法
    Mysql通过MHA实现高可用
  • 原文地址:https://www.cnblogs.com/tmeily/p/4513973.html
Copyright © 2011-2022 走看看