package test091201; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; public class Demo3 { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //设置hbase表名称 conf.set(TableOutputFormat.OUTPUT_TABLE, "waln2"); //将该值改大,防止hbase超时退出 conf.set("dfs.socket.timeout", "180000"); conf.set("hbase.rootdir", "hdfs://ncst:9000/hbase"); conf.set("hbase.zookeeper.quorum", "ncst"); Job job = Job.getInstance(conf); job.setJarByClass(Demo3.class); job.setMapperClass(Demo3Mapper.class); job.setReducerClass(Demo3Reduce.class); //设置map的输出,不设置reduce的输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); //不再设置输出路径,而是设置输出格式类型 job.setOutputFormatClass(TableOutputFormat.class); FileInputFormat.setInputPaths(job, new Path("hdfs://10.16.17.182:9000/test/wal_log")); job.waitForCompletion(true); } //map public static class Demo3Mapper extends Mapper<LongWritable, Text, Text, Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] splited = value.toString().split(" "); SimpleDateFormat dataformat = new SimpleDateFormat("yyyyMMddHHmmss"); String string = splited[0]; long parseLong = Long.parseLong(string.trim()); Date date = new Date(parseLong); String format = dataformat.format(date); //Key=>TelNum:Date Value=>Line context.write(new Text(splited[1]+":"+format), value); } } //reduce public static class Demo3Reduce extends TableReducer<Text, Text, NullWritable>{ @Override protected void reduce(Text key, Iterable<Text> v2s, Context context) throws IOException, InterruptedException { Put put = new Put(key.toString().getBytes()); for (Text text : v2s) { String[] splited = text.toString().split(" "); put.add("info".getBytes(), "date".getBytes(), splited[0].getBytes()); put.add("info".getBytes(), "tel".getBytes(), splited[1].getBytes()); put.add("info".getBytes(), "upPack".getBytes(), splited[6].getBytes()); put.add("info".getBytes(), "downPack".getBytes(), splited[7].getBytes()); put.add("info".getBytes(), "upPay".getBytes(), splited[8].getBytes()); put.add("info".getBytes(), "downPay".getBytes(), splited[9].getBytes()); } context.write(NullWritable.get(), put); } } }