一,实现思路
1,先mapreduces得到并传递数据。
2,写好连接表,创建表,插入表hbase数据库的工具。
3,在reduces中调用写好的hbase工具。
4,main类提交。
二,代码书写
1,mapper
package com; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; //传递数据 public class mapper extends Mapper<LongWritable, Text, Text, User>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, User>.Context context) throws IOException, InterruptedException { String data = value.toString(); String[] s = data.split(","); System.out.println(data); context.write(new Text("1"), new User(s[0],s[1],s[2],s[3],s[4])); } }
2,hbase工具类
package com; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; public class HbaseUtils { public static final String c="info"; //reducer调用的方法 public static void insertinfo(String ip,String port,String tableName,List<User> list) throws Exception{ Connection con=getConnection(ip,port); HBaseAdmin admin = (HBaseAdmin)con.getAdmin(); Table table = con.getTable(TableName.valueOf(tableName)); boolean b = admin.tableExists(TableName.valueOf(tableName)); if(!b){ createTable(admin,tableName); } insertList(table,list); } //插入数据的方法 private static void insertList(Table table, List<User> list) throws Exception { for (User user : list) { Put put = new Put(user.getId().getBytes()); put.addColumn(c.getBytes(), "name".getBytes(), user.getName().getBytes()); put.addColumn(c.getBytes(), "Age".getBytes(), user.getAge().getBytes()); put.addColumn(c.getBytes(), "Sex".getBytes(), user.getSex().getBytes()); put.addColumn(c.getBytes(), "Part".getBytes(), user.getPart().getBytes()); table.put(put); } } //创建表的方法 private static void createTable(Admin admin, String tableName) throws Exception { HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName)); HColumnDescriptor descriptor2 = new HColumnDescriptor(c); descriptor.addFamily(descriptor2); admin.createTable(descriptor); } //获得与hbase的连接 private static Connection getConnection(String ip, String port) throws Exception { Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum", ip); configuration.set("hbase.zookeeper.property.clientPort", port); Connection connection = ConnectionFactory.createConnection(configuration); return connection; } }
3,reducer
package com; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class reducer extends Reducer<Text, User, Text, Text>{ @Override protected void reduce(Text keyin, Iterable<User> value, Reducer<Text, User, Text, Text>.Context conetxt) throws IOException, InterruptedException { ArrayList<User> list=new ArrayList<User>(); //克隆迭代器中的数据 for(User user:value) { User user1=new User(); System.out.println(user); try { BeanUtils.copyProperties(user1, user); list.add(user1); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } System.out.println("list+++++++++++++++"+list); //调用hbase工具的方法 try { HbaseUtils.insertinfo("192.168.184.131", "2181", "sw", list); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } conetxt.write(new Text("status"), new Text(":success")); } }
4,main
package com; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class main { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("mapreduce.framework.name", "local"); conf.set("fs.defaultFS", "file:///"); Job wordCountJob = Job.getInstance(conf); //重要:指定本job所在的jar包 wordCountJob.setJarByClass(main.class); //设置wordCountJob所用的mapper逻辑类为哪个类 wordCountJob.setMapperClass(mapper.class); //设置wordCountJob所用的reducer逻辑类为哪个类 wordCountJob.setReducerClass(reducer.class); //设置map阶段输出的kv数据类型 wordCountJob.setMapOutputKeyClass(Text.class); wordCountJob.setMapOutputValueClass(User.class); //设置最终输出的kv数据类型 wordCountJob.setOutputKeyClass(Text.class); wordCountJob.setOutputValueClass(Text.class); //设置要处理的文本数据所存放的路径 FileInputFormat.setInputPaths(wordCountJob, "C:\test\in6\data.txt"); FileOutputFormat.setOutputPath(wordCountJob, new Path("C:\test\out6")); //提交job给hadoop集群 wordCountJob.waitForCompletion(true); } }