1.首先我们要先创建好表
#在li这个命名空间下创建一个baidu这个命名空间,并且制定列簇info create "li:baidu","info"
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import java.io.IOException; public class ReadFromFileIntoHbase { static class ReadFromFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>{ ImmutableBytesWritable ibw = new ImmutableBytesWritable(); private byte[] FAMILY = Bytes.toBytes("info"); private byte[] SEARCH = Bytes.toBytes("serch"); private byte[] RANK = Bytes.toBytes("rank"); private byte[] CLICK = Bytes.toBytes("click"); private byte[] URL = Bytes.toBytes("url"); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //指定一下新表中的分隔符 String[] words = value.toString().split(" "); //把用户id+时间当做rowkey String rk = words[1]+"-"+words[0]; ibw.set(Bytes.toBytes(rk)); //原表中有 if(words.length==6){ Put put = new Put(Bytes.toBytes(rk)); put.addColumn(FAMILY,SEARCH,Bytes.toBytes(words[2])); put.addColumn(FAMILY,RANK,Bytes.toBytes(words[3])); put.addColumn(FAMILY,CLICK,Bytes.toBytes(words[4])); put.addColumn(FAMILY,URL,Bytes.toBytes(words[5])); context.write(ibw,put); }else { return; } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration config = HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum","server3:2181"); config.set("zookeeper.znode.parent","/hbase-unsecure"); Job job = Job.getInstance(config, "ExampleRead"); job.setJarByClass(ReadFromFileIntoHbase.class); job.setMapperClass(ReadFromFileMapper.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Put.class); //因为我们需要将最后的数据放入一个表中,所以我们需要这样做 TableMapReduceUtil.initTableReducerJob("liang:sogo4",null,job); //设置一个reduce job.setNumReduceTasks(0); FileInputFormat.addInputPath(job,new Path("D:\sogou.500w.utf8")); boolean b = job.waitForCompletion(true); if (!b) { throw new IOException("error with job!"); } } } }
2.测验一下
#查看前10条数据 scan "li:baidu",{LIMIT=>10}