public class LabelJob { public static void main(String[] args) throws Exception { Job job = Job.getInstance(new Configuration()); job.setJarByClass(LabelJob.class); job.setJobName("Hbase.LabelJob"); Configuration conf = job.getConfiguration(); conf.set("tablename", "product_tags"); Scan scan = new Scan(); scan.setCaching(500); scan.setCacheBlocks(false); //输入表 TableMapReduceUtil.initTableMapperJob("tb_user", scan, LabelMapper.class, Text.class, Text.class, job); job.setReducerClass(LabelReducer.class); //输出表 TableMapReduceUtil.initTableReducerJob("usertags", LabelReducer.class, job); job.waitForCompletion(true); } }
public class LabelMapper extends TableMapper<Text, Text> { protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); String tablename = context.getConfiguration().get("tablename"); ................. } protected void map(ImmutableBytesWritable rowKey, Result result, Context context) throws IOException, InterruptedException { String userid = Bytes.toString(rowKey.get()); // 读取HBase用户表rowkey String strlabel = fhb.getStringValue(result, "labels", "label"); String[] userLabels = strlabel.split(","); .................... } }
public class LabelReducer extends TableReducer<Text, Text, ImmutableBytesWritable> { @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String rowKey = key.toString();// 读取Map输出 for (Text v : values) { String tag = v.toString(); Long count = tagMap.get(tag); tagMap.put(tag, (count == null) ? 1 : (count + 1));// 计数 } Put put = new Put(productId.getBytes()); put.add("prodtags".getBytes(), "prodtags".getBytes(),outputlabel.toString().getBytes()); context.write(new ImmutableBytesWritable(productId.getBytes()), put); } }