zoukankan      html  css  js  c++  java
  • HBase概念学习(七)HBase与Mapreduce集成

    这篇文章是看了HBase权威指南之后,依据上面的解说搬下来的样例,可是略微有些不一样。

    HBase与mapreduce的集成无非就是mapreduce作业以HBase表作为输入,或者作为输出,也或者作为mapreduce作业之间共享数据的介质。

    这篇文章将解说两个样例:

    1、读取存储在hdfs上的txt文本数据,简单地以json字符串的形式存储到HBase表中。

    2、将第一步存储的HBase表中的json字符串读取出来,解析存储到新的HBase表中,能够进行查询。

    本文具体给出了源代码以及怎样执行,旨在加深HBase与mapreduce集成的学习。

    假设你还不知道怎么搭建基于HDFS的HBase单机环境,以及怎样执行mapreduce任务,那么请先參考我这两篇文章:

    (1) HBase环境搭建(一)Ubuntu下基于Hadoop文件系统的单机模式

    (2) Hadoop基础学习(一)分析、编写并执行WordCount词频统计程序


    1、读取存储在hdfs上的txt文本数据,简单地以json字符串的形式存储到HBase表中。

    源代码:

    /**
     * @author 季义钦
     * @date 2014-6
     * @reference HBase权威指南 chapter7
     * 
     */
    
    import java.io.IOException;
    import org.apache.commons.cli.CommandLine;
    import org.apache.commons.cli.CommandLineParser;
    import org.apache.commons.cli.HelpFormatter;
    import org.apache.commons.cli.Option;
    import org.apache.commons.cli.Options;
    import org.apache.commons.cli.ParseException;
    import org.apache.commons.cli.PosixParser;
    import org.apache.commons.codec.digest.DigestUtils;
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.KeyValue;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class HdfsToHBase 
    {
    	private static final Log LOG = LogFactory.getLog(HdfsToHBase.class);
    	public static final String NAME = "ImportFromFile";
    	public enum Counters { LINES }
    	  
    	/**
    	 * Map类
    	 *
    	 */
    	static class ImportMapper 
    	extends Mapper<LongWritable, Text, ImmutableBytesWritable, Writable>
    	{
    		private byte[] family = null;
    		private byte[] qualifier = null;
    		
    		@Override
    		protected void setup(Context context) throws IOException, InterruptedException
    		{
    			//获取通过Configuration传过来的列名
    			String columns = context.getConfiguration().get("conf.column");
    			
    			//解析出列族和列的名称
    			byte[][] columnsBytes = KeyValue.parseColumn(Bytes.toBytes(columns));
    			family = columnsBytes[0];
    			qualifier = columnsBytes[1];
    			
    			LOG.info("family:"+family.toString()+"qualifiers:"+qualifier);
    		}
    		
    		@Override
    		public void map(LongWritable offset, Text line, Context context) throws IOException
    		{
    			try
    			{
    				String lineStr = line.toString();
    				byte[] rowkey = DigestUtils.md5(lineStr);
    				
    				//构造Put对象
    				Put put = new Put(rowkey);
    				put.add(family, qualifier, Bytes.toBytes(lineStr));
    				
    				//发射Put对象
    				context.write(new ImmutableBytesWritable(rowkey), put);
    				context.getCounter(Counters.LINES).increment(1);
    				
    			}catch(Exception e)
    			{
    				e.printStackTrace();
    			}
    		}
    		
    	}
    	
    	/**
    	 * 将命令行參数解析为HBase的CommandLine对象
    	 * @param args
    	 * @return
    	 * @throws ParseException
    	 */
    	private static CommandLine parseArgs(String[] args) throws ParseException
    	{
    		Options options = new Options();
    	    Option o = new Option("t", "table", true, "table to import into (must exist)");
    	    o.setArgName("table-name");
    	    o.setRequired(true);
    	    options.addOption(o);
    	    
    	    o = new Option("c", "column", true, "column to store row data into (must exist)");
    	    o.setArgName("family:qualifier");
    	    o.setRequired(true);
    	    options.addOption(o);
    	    
    	    o = new Option("i", "input", true, "the directory or file to read from");
    	    o.setArgName("path-in-HDFS");
    	    o.setRequired(true);
    	    options.addOption(o);
    	    
    	    CommandLineParser parser = new PosixParser();
    	    CommandLine cmd = null;
    	    
    	    try 
    	    {
    	        cmd = parser.parse(options, args);
    	    } catch (Exception e) {
    	        System.err.println("ERROR: " + e.getMessage() + "
    ");
    	        HelpFormatter formatter = new HelpFormatter();
    	        formatter.printHelp(NAME + " ", options, true);
    	        System.exit(-1);
    	    }
    	    
    	    return cmd;
    	}
    	
    	/**
    	 * 主函数
    	 * @param args
    	 * @throws Exception
    	 */
    	public static void main(String[] args) throws Exception
    	{
    		//将输入參数解析为CommandLine对象
    		Configuration conf = HBaseConfiguration.create();
    	    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    	    CommandLine cmd = parseArgs(otherArgs);
    	    
    	    //取出各项參数
    	    String tableName = cmd.getOptionValue("t");
    	    String inputFileName = cmd.getOptionValue("i");
    	    String columnName = cmd.getOptionValue("c");
    	    conf.set("conf.column", columnName);
    	    
    	    Job job = new Job(conf, "Import from file " + inputFileName + " into table " + tableName);
    	    job.setJarByClass(HdfsToHBase.class);
    	    
    	    //设置map和reduce类
    	    job.setMapperClass(ImportMapper.class);
    	    job.setNumReduceTasks(0);
    	    
    	    //设置map阶段输出的键值对类型
    	    job.setOutputKeyClass(ImmutableBytesWritable.class);
    	    job.setOutputValueClass(Writable.class);
    	    
    	    //设置job输入输出格式
    	    job.setOutputFormatClass(TableOutputFormat.class);
    	    job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, tableName);	    
    	    
    	    //设置输入输出路径
    	    FileInputFormat.addInputPath(job, new Path(inputFileName));
    	    
    	    System.exit(job.waitForCompletion(true) ? 0 : 1);
    	}
    }

    引入的jar文件包含:


    这是在eclispe中开发的,放在默认的包以下,导出为普通的jar文件。

    然后利用命令start-all.sh和start-hbase.sh分别启动hadoop和HBase。


    (1)首先登陆HBase shell,创建一个仅仅包括一个列族的表



    (2)然后将txt数据上传到HDFS上面(数据在HBase权威指南随书的源代码包中有)。



    (3)然后运行job:


    当中指定了main函数所在的类名,然后就各自是habse 表名,hdfs文件名称,hbase表的列名。

    作业运行完毕之后能够到:http://localhost:50030/jobtracker.jsp 查看作业运行状态。

    然后能够登陆hbase shell查看article表中有多少行数据,也能够用scan所有打印出来看。


    2、将第一步存储的HBase表中的json字符串读取出来,解析存储到新的HBase表中,能够进行查询。

    源代码:

    /**
     * @author 季义钦
     * @date 2014-6
     * @reference HBase权威指南 chapter7
     * 
     */
    import java.io.IOException;
    import org.apache.commons.cli.CommandLine;
    import org.apache.commons.cli.CommandLineParser;
    import org.apache.commons.cli.HelpFormatter;
    import org.apache.commons.cli.Option;
    import org.apache.commons.cli.Options;
    import org.apache.commons.cli.ParseException;
    import org.apache.commons.cli.PosixParser;
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.KeyValue;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.IdentityTableReducer;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.json.simple.JSONObject;
    import org.json.simple.parser.JSONParser;
    
    public class HBaseToHBase 
    {
    	private static final Log LOG = LogFactory.getLog(HBaseToHBase.class);
    	public static final String NAME = "HBaseToHBase";
    	public enum Counters { ROWS, COLS, ERROR, VALID }
    	
    	/**
    	 * Map类
    	 * 以HBase表作为输入,所以继承自TableMapper
    	 *
    	 */
    	static class ParseMapper
    	extends TableMapper<ImmutableBytesWritable, Writable>
    	{
    		private JSONParser parser = new JSONParser();
    		private byte[] family = null;
    		
    		@Override
    		protected void setup(Context context) throws IOException, InterruptedException
    		{
    			family = Bytes.toBytes(context.getConfiguration().get("conf.family"));
    		}
    		
    		@Override 
    		public void map(ImmutableBytesWritable rowKey, Result columns, Context context) throws IOException
    		{
    			String value = null;		
    			
    			try
    			{
    				String author = "null";
    				Put put = new Put(rowKey.get());
    				
    				//循环取得每一列(这里实际上仅仅有一列存储json字符串)
    				for(KeyValue kv:columns.list())
    				{
    					context.getCounter(Counters.COLS).increment(1);
    					value = Bytes.toStringBinary(kv.getValue());
    					
    					//解析获取的json字符串
    					JSONObject json = (JSONObject)parser.parse(value);
    					for(Object key : json.keySet())
    					{						
    						Object val = json.get(key);
    						if(key.equals("author"))
    						{
    							author = val.toString();
    						}
    						
    						put.add(family, Bytes.toBytes(key.toString()), Bytes.toBytes(val.toString()));						
    					}					
    				}
    				
    				//以解析到的author作为行键发射出去
    				context.write(new ImmutableBytesWritable(Bytes.toBytes(author)), put);
    				context.getCounter(Counters.VALID).increment(1);
    				LOG.info("存储作者 "+author+"的数据完毕!");
    			}catch(Exception e)
    			{
    				e.printStackTrace();
    				System.err.println("Error: " + e.getMessage() + ", Row: " +
    				          Bytes.toStringBinary(rowKey.get()) + ", JSON: " + value);
    				        context.getCounter(Counters.ERROR).increment(1);
    			}
    		}
    	}
    	
    	/**
    	 * 解析命令行參数
    	 * @param args
    	 * @return
    	 * @throws ParseException
    	 */
    	private static CommandLine parseArgs(String[] args) throws ParseException 
    	{
    		Options options = new Options();
    	    
    		Option o = new Option("i", "input", true, "table to read from (must exist)");
    		o.setArgName("input-table-name");
    		o.setRequired(true);
    		options.addOption(o);
    	    
    	    o = new Option("ic", "column", true, "column to read data from (must exist)");
    	    o.setArgName("family:qualifier");
    	    o.setRequired(true);
    	    options.addOption(o);
    	    
    	  	o = new Option("o", "output", true, "table to write to (must exist)");
    	  	o.setArgName("output-table-name");
    	  	o.setRequired(true);
    	  	options.addOption(o);
    	    
    	    o = new Option("oc", "family", true, "cf to write data to (must exist)");
    	    o.setArgName("family");
    	    o.setRequired(true);
    	    options.addOption(o);
    	    
    	    CommandLineParser parser = new PosixParser();
    	    CommandLine cmd = null;
    	    try 
    	    {
    	      cmd = parser.parse(options, args);
    	    } 
    	    catch (Exception e) 
    	    {
    	      System.err.println("ERROR: " + e.getMessage() + "
    ");
    	      HelpFormatter formatter = new HelpFormatter();
    	      formatter.printHelp(NAME + " ", options, true);
    	      System.exit(-1);
    	    }
    	    return cmd;
    	  }
    	
    	/**
    	 * 主函数
    	 * @param args
    	 */
    	public static void main(String[] args) throws Exception
    	{
    	    Configuration conf = HBaseConfiguration.create();
    	    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    	    CommandLine cmd = parseArgs(otherArgs);
    	    
    	    String inputTable = cmd.getOptionValue("i");	//HBase源表
    	    String outputTable = cmd.getOptionValue("o");	//HBase目标表
    	    String inputColumn = cmd.getOptionValue("ic");	//HBase源表的列名
    	    String outputColumnFamily = cmd.getOptionValue("oc");	//HBase目标表的列族名
    	    conf.set("conf.family", outputColumnFamily);    
    	    
    	    //提供Scan实例指定要扫描的列
    	    Scan scan = new Scan();
    	    byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(inputColumn));	    	
    	    scan.addColumn(colkey[0], colkey[1]);	        
    	    
    	    Job job = new Job(conf, "Parse data in " + inputTable + ", write to " + outputTable);
    	    job.setJarByClass(HBaseToHBase.class);  
    	    
    	    //高速配置作业以HBase作为输入源和输出源
    	    TableMapReduceUtil.initTableMapperJob(inputTable, scan, ParseMapper.class, ImmutableBytesWritable.class, Put.class, job);
    	    TableMapReduceUtil.initTableReducerJob(outputTable, IdentityTableReducer.class, job);
    
    	    System.exit(job.waitForCompletion(true) ? 0 : 1);
    	}
    
    }
    

    注意:

    (1)以HBase表作为mapreduce作业的输入时,一方面要继承字TableMapper类,一方面须要提供一个scan实例,指定要扫描来作为输入的记录。

    (2)当中配置的Reduce是IdentityTableReducer,其作用和IdentityTableMapper一样,仅仅是简单地将键值对传递到下一个阶段而已,没有什么实质性作用,它对于数据存储到HBase表中不是必须的,全然能够用另外一句话替代,即: setNumReduceTasks(0).

    实际上作业运行的时候你应该也能够看到reduce一直是0%。


    引入的jar文件包含:



    (1)创建HBase表:



    (2)导出jar包:

    注意:里面引入了一个第三方的jar包,即simple json的jar包,用于解析json字符串。

    simple json jar文件在这里下载:http://www.java2s.com/Code/Jar/j/Downloadjsonsimple111jar.htm

    之前在一个站点下了一个山寨的,结果没有parse(string)这个接口,仅仅有parse(Reader)这个接口,将String转换成StringReader传进去结果作业老是报错,坑死了。

    引入第三方jar包运行Mapreduce作业的时候会报出classnotFound的异常,解决方法有下面几种:

    1.把要依赖的包部署到每台tasktracker上面

    这种方法最简单,可是要部署到每台tasktracker,并且可能引起包污染的问题。比方应用A和应用B都用到同一个libray,可是版本号不同,就会出现冲突的问题。

    2.把依赖的包和直接合并到mapreducejob的包

    这种方法的问题是合并后的包可能很大,也不利于的包的升级

    3.使用DistributedCache

    这种方法就是先把这些包上传到HDFS,能够在程序启动的时候做一次。然后在submitjob的时候把hdfspath加到classpath里面。
    演示样例:

    $bin/hadoop fs -copyFromLocal ib/protobuf-java-2.0.3.jar/myapp/protobuf-java-2.0.3.jar //Setup the application's JobConf:JobConf job = new JobConf(); DistributedCache.addFileToClassPath(newPath("/myapp/protobuf-java-2.0.3.jar"), job);

    4,另一种情况是扩展包特别多的情况下用3就不爽了,參考一下:

    Hadoop权威指南》中也有关于jar打包的处理措施,查找之

    【不论什么非独立的JAR文件都必须打包到JAR文件的lib文件夹中。(这与Javawebapplication archiveWAR文件类似,不同的是,后者的JAR文件放在WEB-INF/lib子文件夹下的WAR文件里)】

    我採用的是第四种方法,在project以下创建一个lib目录,将json-simple-1.1.1.jar放进去:


    然后export:



    (3)运行job:


    OK了,以下就能够用hbase shell登陆,并用scan ‘authorTable’查看解析进去的数据了。



  • 相关阅读:
    Tomcat与Spring中的事件机制详解
    Kafka消息系统基础知识索引
    配置SpringBoot-从日志系统配置说起
    支付宝手机网页支付和微信公众号支付接入
    centos下搭建YII环境
    为什么需要 Stream
    基于Django的Rest Framework框架的序列化组件
    基于Django的Rest Framework框架的RESTful规范研究
    web中状态码301和302的区别
    Django初见
  • 原文地址:https://www.cnblogs.com/mengfanrong/p/3880045.html
Copyright © 2011-2022 走看看