zoukankan      html  css  js  c++  java
  • hadoop执行hdfs文件到hbase表插入操作(xjl456852原创)

    本例中需要将hdfs上的文本文件,解析后插入到hbase的表中.
    本例用到的hadoop版本2.7.2 hbase版本1.2.2

    hbase的表如下:
    1. create 'ns2:user', 'info'
    hdfs上的文本文件如下[data/hbase_input/hbase.txt]
    1. 1,xiejl,20
    2. 2,haha,30
    3. 3,liudehua,40
    4. 4,daoming,41
    可以通过命令查看hadoop的classpath现在包含哪些jar包:
    1. [hadoop@master ~]$ hdfs classpath

    java的主方法:
    1. package com.xjl456852.mapreduce;
    2. import org.apache.hadoop.fs.Path;
    3. import org.apache.hadoop.hbase.client.Put;
    4. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    5. import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
    6. import org.apache.hadoop.io.Text;
    7. import org.apache.hadoop.mapreduce.Job;
    8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    9. import java.io.IOException;
    10. /**
    11. * 将hdfs中的文本文件写入到hbase的表中
    12. * 程序的运行需要加入hadoop的配置文件和hbase的配置文件到jar包中
    13. * 对应的hbase的表
    14. * create 'ns2:user','info'
    15. *
    16. * Created by xiejl on 2016/8/10.
    17. */
    18. public class HBaseApp {
    19. public static void main(String [] args) {
    20. try {
    21. Job job = Job.getInstance();
    22. job.setJobName("text into hbase table");
    23. job.setJarByClass(HBaseApp.class);
    24. FileInputFormat.addInputPath(job, new Path(args[0]));
    25. //设置表名
    26. job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, args[1]);
    27. //设置输出格式为table
    28. job.setOutputFormatClass(TableOutputFormat.class);
    29. //设置输出的key类型为ImmutableBytesWritable
    30. job.setOutputKeyClass(ImmutableBytesWritable.class);
    31. //设置输出的value类型为Put
    32. job.setOutputValueClass(Put.class);
    33. //因为map输出key和reduce输出的key类型不一致,所以需要再设置map的key输出类型为Text
    34. job.setMapOutputKeyClass(Text.class);
    35. //因为map输出value和reduce输出的value类型不一致,所以需要再设置map的value输出类型为Text
    36. job.setMapOutputValueClass(Text.class);
    37. //Mapper
    38. job.setMapperClass(MyMapper.class);
    39. //Reducer
    40. job.setReducerClass(MyReducer.class);
    41. System.exit(job.waitForCompletion(true) ? 0 : 1);
    42. } catch (InterruptedException e) {
    43. e.printStackTrace();
    44. } catch (IOException e) {
    45. e.printStackTrace();
    46. } catch (ClassNotFoundException e) {
    47. e.printStackTrace();
    48. }
    49. }
    50. }
    Mapper类:
    1. package com.xjl456852.mapreduce;
    2. import org.apache.hadoop.io.LongWritable;
    3. import org.apache.hadoop.io.Text;
    4. import org.apache.hadoop.mapreduce.Mapper;
    5. import java.io.IOException;
    6. /**
    7. * Created by xiejl on 2016/8/10.
    8. */
    9. public class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
    10. @Override
    11. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    12. String line = value.toString();
    13. int index = line.indexOf(",");
    14. String rowKey = line.substring(0, index);
    15. //跳过逗号
    16. String valueLine = line.substring(index+1);
    17. context.write(new Text(rowKey), new Text(valueLine));
    18. }
    19. }
    Reducer类:
    1. package com.xjl456852.mapreduce;
    2. import org.apache.hadoop.hbase.client.Put;
    3. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    4. import org.apache.hadoop.hbase.util.Bytes;
    5. import org.apache.hadoop.io.Text;
    6. import org.apache.hadoop.mapreduce.Reducer;
    7. import java.io.IOException;
    8. /**
    9. * Created by xiejl on 2016/8/11.
    10. */
    11. public class MyReducer extends Reducer<Text, Text, ImmutableBytesWritable, Put> {
    12. @Override
    13. protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    14. byte[] rowKey = Bytes.toBytes(key.toString());
    15. for(Text text : values) {
    16. //设置put对象的行键
    17. Put put = new Put(rowKey);
    18. String line = text.toString();
    19. int index = line.indexOf(",");
    20. String name = line.substring(0, index);
    21. String age = line.substring(index+1);
    22. //列族的是建表时固定的,列和值是插入时添加的.
    23. put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"),Bytes.toBytes(name));
    24. put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"),Bytes.toBytes(age));
    25. context.write(new ImmutableBytesWritable(Bytes.toBytes(key.toString())), put);
    26. }
    27. }
    28. }
    将程序打为jar包,jar包中需要加入hadoop的配置文件和hbase的配置 (有人将程序打成胖包,就是将依赖的jar,依赖的四个类库hbase-client,hbase-server,hbase-common,hbsae-protocol放入lib目录中,我试验不行,会出现map和reduce任务都执行到100%时卡住不动,等十分钟又出现 FAILED AttemptID:attempt_xxx Timed out after 600 secs,然后又重新执行mapreduce任务,然后又卡住,得结束掉mapreduce进程才能终止)

    需要修改集群的配置文件,以满足hadoop执行hbase表插入操作时,能找到相关的类库.

    将HBase的类jar包加到hadoop的classpath下, 修改${HADOOP_HOME}/etc/hadoop/hadoop-env.sh。配置好这个文件,分发到各个节点,改这个配置不用重启集群.

    1. TEMP=`ls /opt/modules/hbase/lib/*.jar`
    2. HBASE_JARS=`echo $TEMP | sed 's/ /:/g'`
    3. HADOOP_CLASSPATH=$HBASE_JARS

    如果现在运行程序还是会出错,详情可以看我的另一篇文章.hadoop执行hbase插入表操作,出错:Stack trace: ExitCodeException exitCode=1:(xjl456852原创)

    还需要在${HADOOP_HOME}/etc/hadoop/yarn-site.xml中加入mapreduce运行时需要的类库,需要设置yarn.application.classpath:
    所以我在yarn-site.xml中加入了如下配置,并加入了hbase的lib目录,配置好这个文件,分发到各个节点,这个配置需要重启集群
    1. <property>
    2. <name>yarn.application.classpath</name>
    3. <value>
    4. /opt/modules/hadoop/etc/*,
    5. /opt/modules/hadoop/etc/hadoop/*,
    6. /opt/modules/hadoop/lib/*,
    7. /opt/modules/hadoop/share/hadoop/common/*,
    8. /opt/modules/hadoop/share/hadoop/common/lib/*,
    9. /opt/modules/hadoop/share/hadoop/mapreduce/*,
    10. /opt/modules/hadoop/share/hadoop/mapreduce/lib/*,
    11. /opt/modules/hadoop/share/hadoop/hdfs/*,
    12. /opt/modules/hadoop/share/hadoop/hdfs/lib/*,
    13. /opt/modules/hadoop/share/hadoop/yarn/*,
    14. /opt/modules/hadoop/share/hadoop/yarn/lib/*,
    15. /opt/modules/hbase/lib/*
    16. </value>
    17. </property>

    然后执行:
    1. hadoop jar hbase.jar com.xjl456852.mapreduce.HBaseApp data/hbase_input ns2:user
    查看hbase的表内容:
    1. hbase(main):001:0> scan 'ns2:user'
    2. ROW COLUMN+CELL
    3. 1 column=info:age, timestamp=1470966325326, value=20
    4. 1 column=info:name, timestamp=1470966325326, value=xiejl
    5. 2 column=info:age, timestamp=1470966325326, value=30
    6. 2 column=info:name, timestamp=1470966325326, value=haha
    7. 3 column=info:age, timestamp=1470966325326, value=40
    8. 3 column=info:name, timestamp=1470966325326, value=liudehua
    9. 4 column=info:age, timestamp=1470966325326, value=41
    10. 4 column=info:name, timestamp=1470966325326, value=daoming
    11. 4 row(s) in 0.3100 seconds
    可以看到数据已经插入到hbase表中.


    还可以将Reducer类写成继承TableReducer方式,代码如下,执行后会有同样的结果:
    1. package com.xjl456852.mapreduce;
    2. import org.apache.hadoop.hbase.client.Put;
    3. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    4. import org.apache.hadoop.hbase.mapreduce.TableReducer;
    5. import org.apache.hadoop.hbase.util.Bytes;
    6. import org.apache.hadoop.io.Text;
    7. import java.io.IOException;
    8. /**
    9. * 如果继承TableReducer,从源码中可以看到,输出的value是Mutation类型,也就是输出的值可以是Put,Delete之类的类型
    10. * Created by xiejl on 2016/8/11.
    11. */
    12. public class MyReducer2 extends TableReducer<Text, Text, ImmutableBytesWritable> {
    13. @Override
    14. protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    15. byte[] rowKey = Bytes.toBytes(key.toString());
    16. for(Text text : values) {
    17. //设置put对象的行键
    18. Put put = new Put(rowKey);
    19. String line = text.toString();
    20. int index = line.indexOf(",");
    21. String name = line.substring(0, index);
    22. String age = line.substring(index+1);
    23. //列族的是建表时固定的,列和值是插入时添加的.
    24. put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"),Bytes.toBytes(name));
    25. put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"),Bytes.toBytes(age));
    26. context.write(new ImmutableBytesWritable(Bytes.toBytes(key.toString())), put);
    27. }
    28. context.getCounter("reduce", "over").increment(1);
    29. }
    30. }


  • 相关阅读:
    如何保持mysql和redis中数据的一致性?
    秒杀系统设计&测试
    缓存穿透、缓存击穿、缓存雪崩区别和解决方案
    数据库关联子查询和非关联子查询
    mysql中 = 与in区别_浅析mysql中 exists 与 in 的区别,空判断
    mysql关键字执行顺序
    python中字典删除元素
    Python list根据下标插入/删除元素
    nginx504网关超时解决方法
    CDN加速
  • 原文地址:https://www.cnblogs.com/xjl456852/p/5766205.html
Copyright © 2011-2022 走看看