zoukankan      html  css  js  c++  java
  • MapReduce-读取文件写入HBase

    MapReduce直接写入HBase

    代码如下

    package com.hbase.mapreduce;
    
    import java.io.IOException;
    
    import org.apache.commons.cli.CommandLine;
    import org.apache.commons.cli.CommandLineParser;
    import org.apache.commons.cli.HelpFormatter;
    import org.apache.commons.cli.Option;
    import org.apache.commons.cli.Options;
    import org.apache.commons.cli.ParseException;
    import org.apache.commons.cli.PosixParser;
    import org.apache.commons.codec.digest.DigestUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.KeyValue;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /**
    * @author:FengZhen
    * @create:2018年9月14日
    */
    public class ImportFromFile extends Configured implements Tool{
    
    	private static String addr="HDP233,HDP232,HDP231";
    	private static String port="2181";
    	public static final String NAME = "ImportFromFile";
    	public enum Counters { LINES }
    	
    	static class ImportMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
    		
    		private byte[] family = null;
    		private byte[] qualifier = null;
    		
    		@Override
    		protected void setup(Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context)
    				throws IOException, InterruptedException {
    			String column = context.getConfiguration().get("conf.column");
    			byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));
    			family = colkey[0];
    			if (colkey.length > 1) {
    				qualifier = colkey[1];
    			}
    		}
    		
    		@Override
    		protected void map(LongWritable key, Text value,
    				Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context)
    				throws IOException, InterruptedException {
    			try {
    				String lineString = value.toString();
    				//行键是经过MD5散列之后随机生成的键值
    				byte[] rowkey = DigestUtils.md5(lineString);
    				Put put = new Put(rowkey);
    				//存储原始数据到给定的表中的一列
    				put.addColumn(family, qualifier, Bytes.toBytes(lineString));
    				context.write(new ImmutableBytesWritable(rowkey), put);
    				context.getCounter(Counters.LINES).increment(1L);
    			} catch (Exception e) {
    				e.printStackTrace();
    			}
    		}
    	}
    	
    	/**
    	 * 使用Apache Commons CLI类解析命令行参数。
    	 * @param args
    	 * @return
    	 */
    	private static CommandLine parseArgs(String[] args) {
    		Options options = new Options();
    		Option option = new Option("t", "table", true, "table to import into -must exist");
    		option.setArgName("table-name");
    		option.setRequired(true);
    		options.addOption(option);
    		
    		option = new Option("c", "column", true, "column to store row data into -must exit");
    		option.setArgName("family:qualifier");
    		option.setRequired(true);
    		options.addOption(option);
    		
    		option = new Option("i", "input", true, "the directory or file to read from");
    		option.setArgName("path-in-HDFS");
    		option.setRequired(true);
    		options.addOption(option);
    		
    		options.addOption("d", "debug", false, "switch on DEBUG log level");
    		
    		CommandLineParser parser = new PosixParser();
    		CommandLine cmd = null;
    		try {
    			cmd = parser.parse(options, args);
    		} catch (ParseException e) {
    			e.printStackTrace();
    			System.err.println("ERROR: " + e.getMessage() + "
    ");
    			HelpFormatter formatter = new HelpFormatter();
    			formatter.printHelp(NAME + " ", options, true);
    			System.exit(1);
    		} 
    		return cmd;
    	}
    	
    	public int run(String[] arg0) throws Exception {
    		Configuration configuration = HBaseConfiguration.create();
    		configuration.set("hbase.zookeeper.quorum",addr);
    		configuration.set("hbase.zookeeper.property.clientPort", port);
    		//String[] otherArgs = new GenericOptionsParser(configuration, arg0).getRemainingArgs();
    		//CommandLine commandLine = parseArgs(arg0);
    		
    //		String table = commandLine.getOptionValue("t");
    //		String input = commandLine.getOptionValue("i");
    //		String column = commandLine.getOptionValue("c");
    		
    		String table = arg0[0];
    		String input = arg0[1];
    		String column = arg0[2];
    		configuration.set("conf.column", column);
    		
    		Job job = Job.getInstance(configuration);
    		job.setJobName("ImportFromFile");
    		job.setJarByClass(ImportFromFile.class);
    		job.setMapperClass(ImportMapper.class);
    		job.setOutputFormatClass(TableOutputFormat.class);
    		job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
    		job.setOutputKeyClass(ImmutableBytesWritable.class);
    		job.setOutputValueClass(Writable.class);
    		//这是一个只包含map阶段的作业,框架会直接跳过reduce阶段
    		job.setNumReduceTasks(0);
    		
    		FileInputFormat.addInputPath(job, new Path(input));
    		return job.waitForCompletion(true) ? 0 : 1;
    	}
    	
    	public static void main(String[] args) throws Exception {
    		String[] params = new String[] {"test_table_mr", "hdfs://fz/data/fz/input/hbase", "data:info"};
    		int exitCode = ToolRunner.run(new ImportFromFile(), params);
    		System.exit(exitCode);
    	}
    }
  • 相关阅读:
    time模块
    collection模块
    re模块
    HTML
    Java数据结构之快速排序
    Java数据结构之循环链表(与单链表比较)
    Java数据结构之单链表
    Java数据结构之队列
    Java数据结构之栈
    java数据结构之数组
  • 原文地址:https://www.cnblogs.com/EnzoDin/p/9661686.html
Copyright © 2011-2022 走看看