zoukankan      html  css  js  c++  java
  • Mapreduce读取Hbase表,写数据到一个Hbase表中

    public class LabelJob
    {
        
        public static void main(String[] args)
            throws Exception
        {
            Job job = Job.getInstance(new Configuration());
            job.setJarByClass(LabelJob.class);
            job.setJobName("Hbase.LabelJob");
            
            Configuration conf = job.getConfiguration();
            conf.set("tablename", "product_tags");
            
            Scan scan = new Scan();
            scan.setCaching(500);
            scan.setCacheBlocks(false);
            //输入表
            TableMapReduceUtil.initTableMapperJob("tb_user", scan, LabelMapper.class, Text.class, Text.class, job);
            
            job.setReducerClass(LabelReducer.class);
            //输出表
            TableMapReduceUtil.initTableReducerJob("usertags", LabelReducer.class, job);
            job.waitForCompletion(true);
            
        }
        
    }
    

      

    public class LabelMapper extends TableMapper<Text, Text>
    {
          protected void setup(Context context)
            throws IOException, InterruptedException
        {
            super.setup(context);
            String tablename = context.getConfiguration().get("tablename");
             .................
        } 
     protected void map(ImmutableBytesWritable rowKey, Result result, Context context)
            throws IOException, InterruptedException
        {
              String userid = Bytes.toString(rowKey.get()); // 读取HBase用户表rowkey
            
              String strlabel = fhb.getStringValue(result, "labels", "label");
             String[] userLabels = strlabel.split(",");
    ....................
    }
    }
    

      

    public class LabelReducer extends TableReducer<Text, Text, ImmutableBytesWritable>
    {
         @Override
        public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException
        {
    String rowKey = key.toString();// 读取Map输出
     for (Text v : values)
                {
                    String tag = v.toString();
                    Long count = tagMap.get(tag);
                    tagMap.put(tag, (count == null) ? 1 : (count + 1));// 计数
                }
     Put put = new Put(productId.getBytes());
      put.add("prodtags".getBytes(), "prodtags".getBytes(),outputlabel.toString().getBytes());
    
    context.write(new ImmutableBytesWritable(productId.getBytes()), put);
    
    }
    
    }
    

      

  • 相关阅读:
    1.5寻找倒数第k个元素
    MySQL基础之分组函数
    MySQL基础之单行函数
    MySQL基础查询(一)
    gem install redis Fetching: redis-4.1.3.gem (100%) ERROR: Error installing redis: redis requires Ruby version >= 2.3.0.
    SQL语句
    使用kill无法杀死mysql进程
    Ansible学习笔记
    rsync报错:rsync: chgrp ".hejian.txt.D1juHb" (in backup) failed: Operation not permitted (1)
    Linux磁盘管理
  • 原文地址:https://www.cnblogs.com/tmeily/p/4513973.html
Copyright © 2011-2022 走看看