只使用Mapper不使用reduce会大大减少mapreduce程序的运行时间。
有时候程序会往多张hbase表写数据。
所以有如题的需求。
下面给出的代码,不是可以运行的代码,只是展示driver中需要进行的必要项设置,mapper类需要实现的接口,map函数需要的参数以及函数内部的处理方式。
实现过程比较曲折,只贴代码:
class Qos2HbaseDriver extends Configured implements Tool { private static Logger logger = LoggerFactory .getLogger(Qos2HbaseDriver.class); private static final int DEFAULT_NUM_REDUCE = 0; /** * args[0]输入hdfs文件路径,args[1]输出表 */ @Override public int run(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("output", args[1]);//输出表1 conf.set("output2", args[2]);//输出表2 Job job = Job.getInstance(conf); job.setJobName("iplane_Qos2Hbase"); job.setMapperClass(Qos2HbaseMapper.class); FileInputFormat.setInputPaths(job, args[0]); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); job.setOutputFormatClass(MultiTableOutputFormat.class); TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.addDependencyJars(job.getConfiguration()); job.setJarByClass(Qos2Hbase.class); // 设置reduce个数,可调节 int numberReduceTasks = 0; job.setNumReduceTasks(numberReduceTasks); boolean b = job.waitForCompletion(true); if (!b) { logger.error("工作错误!"); return -1; } return 0; } } /** * @ClassName: Qos2HbaseMapper * @Description: 将结果入Hbase库的mapper类 * @author xxx * @date 2014-9-16 下午1:18:49 * */ class Qos2HbaseMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { private static Logger logger = LoggerFactory .getLogger(Qos2HbaseMapper.class); @Override public void map(LongWritable key, Text line, Context context) throws IOException, InterruptedException { String output = context.getConfiguration().get("output"); String output2 = context.getConfiguration().get("output2"); // 组装rowkey:ip_ip StringBuffer rowkeySb = "aaaa"; Put put = null; String family = "d"; String qualifier = ""; // 直接将结果存入hbase long ts = System.currentTimeMillis(); put = new Put(Bytes.toBytes(rowkeySb.toString())); qualifier = "del"; put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), ts, Bytes.toBytes(values[6]));// 组装一条数据 if (!put.isEmpty()) { ImmutableBytesWritable ib = new ImmutableBytesWritable(); ib.set(Bytes.toBytes(output)); context.write(ib, put);// 将结果存入hbase表 } // 存历史表 rowkeySb.append(rowkeySeparator).append(myDate); put = new Put(Bytes.toBytes(rowkeySb.toString())); qualifier = "del"; put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), ts, Bytes.toBytes(values[6]));// 组装一条数据 if (!put.isEmpty()) { ImmutableBytesWritable ib = new ImmutableBytesWritable(); ib.set(Bytes.toBytes(output2)); context.write(ib, put);// 将结果存入hbase表 } } }