zoukankan      html  css  js  c++  java
  • Hbase调用JavaAPI实现批量导入操作

    将手机上网日志文件批量导入到Hbase中。操作步骤:

    1、将日志文件(请下载附件)上传到HDFS中,利用hadoop的操作命令上传:hadoop  fs -put input  /


     

    2、创建Hbase表,通过Java操作

    Java代码  收藏代码
    1. package com.jiewen.hbase;  
    2.   
    3. import java.io.IOException;  
    4.   
    5. import org.apache.hadoop.conf.Configuration;  
    6. import org.apache.hadoop.hbase.HBaseConfiguration;  
    7. import org.apache.hadoop.hbase.HColumnDescriptor;  
    8. import org.apache.hadoop.hbase.HTableDescriptor;  
    9. import org.apache.hadoop.hbase.client.Get;  
    10. import org.apache.hadoop.hbase.client.HBaseAdmin;  
    11. import org.apache.hadoop.hbase.client.HTable;  
    12. import org.apache.hadoop.hbase.client.Put;  
    13. import org.apache.hadoop.hbase.client.Result;  
    14. import org.apache.hadoop.hbase.client.ResultScanner;  
    15. import org.apache.hadoop.hbase.client.Scan;  
    16. import org.apache.hadoop.hbase.util.Bytes;  
    17.   
    18. public class HbaseDemo {  
    19.   
    20.     public static void main(String[] args) throws IOException {  
    21.         String tableName = "wlan_log";  
    22.         String columnFamily = "cf";  
    23.   
    24.         HbaseDemo.create(tableName, columnFamily);  
    25.   
    26.         // HbaseDemo.put(tableName, "row1", columnFamily, "cl1", "data");  
    27.         // HbaseDemo.get(tableName, "row1");  
    28.         // HbaseDemo.scan(tableName);  
    29.         // HbaseDemo.delete(tableName);  
    30.     }  
    31.   
    32.     // hbase操作必备  
    33.     private static Configuration getConfiguration() {  
    34.         Configuration conf = HBaseConfiguration.create();  
    35.         conf.set("hbase.rootdir""hdfs://hadoop1:9000/hbase");  
    36.         // 使用eclipse时必须加入这个,否则无法定位  
    37.         conf.set("hbase.zookeeper.quorum""hadoop1");  
    38.         return conf;  
    39.     }  
    40.   
    41.     // 创建一张表  
    42.     public static void create(String tableName, String columnFamily)  
    43.             throws IOException {  
    44.         HBaseAdmin admin = new HBaseAdmin(getConfiguration());  
    45.         if (admin.tableExists(tableName)) {  
    46.             System.out.println("table exists!");  
    47.         } else {  
    48.             HTableDescriptor tableDesc = new HTableDescriptor(tableName);  
    49.             tableDesc.addFamily(new HColumnDescriptor(columnFamily));  
    50.             admin.createTable(tableDesc);  
    51.             System.out.println("create table success!");  
    52.         }  
    53.     }  
    54.   
    55.     // 加入一条记录  
    56.     public static void put(String tableName, String row, String columnFamily,  
    57.             String column, String data) throws IOException {  
    58.         HTable table = new HTable(getConfiguration(), tableName);  
    59.         Put p1 = new Put(Bytes.toBytes(row));  
    60.         p1.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes  
    61.                 .toBytes(data));  
    62.         table.put(p1);  
    63.         System.out.println("put'" + row + "'," + columnFamily + ":" + column  
    64.                 + "','" + data + "'");  
    65.     }  
    66.   
    67.     // 读取一条记录  
    68.     public static void get(String tableName, String row) throws IOException {  
    69.         HTable table = new HTable(getConfiguration(), tableName);  
    70.         Get get = new Get(Bytes.toBytes(row));  
    71.         Result result = table.get(get);  
    72.         System.out.println("Get: " + result);  
    73.     }  
    74.   
    75.     // 显示全部数据  
    76.     public static void scan(String tableName) throws IOException {  
    77.         HTable table = new HTable(getConfiguration(), tableName);  
    78.         Scan scan = new Scan();  
    79.         ResultScanner scanner = table.getScanner(scan);  
    80.         for (Result result : scanner) {  
    81.             System.out.println("Scan: " + result);  
    82.         }  
    83.     }  
    84.   
    85.     // 删除表  
    86.     public static void delete(String tableName) throws IOException {  
    87.         HBaseAdmin admin = new HBaseAdmin(getConfiguration());  
    88.         if (admin.tableExists(tableName)) {  
    89.             try {  
    90.                 admin.disableTable(tableName);  
    91.                 admin.deleteTable(tableName);  
    92.             } catch (IOException e) {  
    93.                 e.printStackTrace();  
    94.                 System.out.println("Delete " + tableName + " 失败");  
    95.             }  
    96.         }  
    97.         System.out.println("Delete " + tableName + " 成功");  
    98.     }  
    99.   
    100. }  

     

    3、将日志文件导入Hbase表wlan_log中:

    Java代码  收藏代码
    1. import java.text.SimpleDateFormat;  
    2. import java.util.Date;  
    3.   
    4. import org.apache.hadoop.conf.Configuration;  
    5. import org.apache.hadoop.hbase.client.Put;  
    6. import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;  
    7. import org.apache.hadoop.hbase.mapreduce.TableReducer;  
    8. import org.apache.hadoop.hbase.util.Bytes;  
    9. import org.apache.hadoop.io.LongWritable;  
    10. import org.apache.hadoop.io.NullWritable;  
    11. import org.apache.hadoop.io.Text;  
    12. import org.apache.hadoop.mapreduce.Counter;  
    13. import org.apache.hadoop.mapreduce.Job;  
    14. import org.apache.hadoop.mapreduce.Mapper;  
    15. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
    16. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
    17.   
    18. public class HbaseBatchImport {  
    19.   
    20.     public static void main(String[] args) throws Exception {  
    21.         final Configuration configuration = new Configuration();  
    22.         // 设置zookeeper  
    23.         configuration.set("hbase.zookeeper.quorum""hadoop1");  
    24.   
    25.         // 设置hbase表名称  
    26.         configuration.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log");  
    27.   
    28.         // 将该值改大,防止hbase超时退出  
    29.         configuration.set("dfs.socket.timeout""180000");  
    30.   
    31.         final Job job = new Job(configuration, "HBaseBatchImport");  
    32.   
    33.         job.setMapperClass(BatchImportMapper.class);  
    34.         job.setReducerClass(BatchImportReducer.class);  
    35.         // 设置map的输出,不设置reduce的输出类型  
    36.         job.setMapOutputKeyClass(LongWritable.class);  
    37.         job.setMapOutputValueClass(Text.class);  
    38.   
    39.         job.setInputFormatClass(TextInputFormat.class);  
    40.         // 不再设置输出路径。而是设置输出格式类型  
    41.         job.setOutputFormatClass(TableOutputFormat.class);  
    42.   
    43.         FileInputFormat.setInputPaths(job, "hdfs://hadoop1:9000/input");  
    44.   
    45.         job.waitForCompletion(true);  
    46.     }  
    47.   
    48.     static class BatchImportMapper extends  
    49.             Mapper<LongWritable, Text, LongWritable, Text> {  
    50.         SimpleDateFormat dateformat1 = new SimpleDateFormat("yyyyMMddHHmmss");  
    51.         Text v2 = new Text();  
    52.   
    53.         protected void map(LongWritable key, Text value, Context context)  
    54.                 throws java.io.IOException, InterruptedException {  
    55.             final String[] splited = value.toString().split(" ");  
    56.             try {  
    57.                 final Date date = new Date(Long.parseLong(splited[0].trim()));  
    58.                 final String dateFormat = dateformat1.format(date);  
    59.                 String rowKey = splited[1] + ":" + dateFormat;  
    60.                 v2.set(rowKey + " " + value.toString());  
    61.                 context.write(key, v2);  
    62.             } catch (NumberFormatException e) {  
    63.                 final Counter counter = context.getCounter("BatchImport",  
    64.                         "ErrorFormat");  
    65.                 counter.increment(1L);  
    66.                 System.out.println("出错了" + splited[0] + " " + e.getMessage());  
    67.             }  
    68.         };  
    69.     }  
    70.   
    71.     static class BatchImportReducer extends  
    72.             TableReducer<LongWritable, Text, NullWritable> {  
    73.         protected void reduce(LongWritable key,  
    74.                 java.lang.Iterable<Text> values, Context context)  
    75.                 throws java.io.IOException, InterruptedException {  
    76.             for (Text text : values) {  
    77.                 final String[] splited = text.toString().split(" ");  
    78.   
    79.                 final Put put = new Put(Bytes.toBytes(splited[0]));  
    80.                 put.add(Bytes.toBytes("cf"), Bytes.toBytes("date"), Bytes  
    81.                         .toBytes(splited[1]));  
    82.                 // 省略其它字段,调用put.add(....)就可以  
    83.                 context.write(NullWritable.get(), put);  
    84.             }  
    85.         };  
    86.     }  
    87.   
    88. }  

     4、查看导入结果:



     

  • 相关阅读:
    linux添加到普通用户sudo才干
    跳跃Java一些周期,双跳FOR周期
    Android虚拟机器学习总结Dalvik虚拟机创建进程和线程分析
    Notes系统安全日志
    android Intent.createChooser 应用选择
    创业这么难,去哪儿?
    视频和音频播放的演示最简单的例子6:OpenGL广播YUV420P(T经exture,采用Shader)
    名单(两)——基本操作单向链表(创、删、印、节点统计数)
    JS列
    第38周三
  • 原文地址:https://www.cnblogs.com/zhchoutai/p/8610046.html
Copyright © 2011-2022 走看看