zoukankan      html  css  js  c++  java
  • 【HBase】HBase与MapReduce集成——从HDFS的文件读取数据到HBase


    需求

    将HDFS路径 /hbase/input/user.txt 文件的内容读取并写入到HBase 表myuser2
    首先在HDFS上准备些数据让我们用

    hdfs dfs -mkdir -p /hbase/input
    cd /export/servers/
    vim user.txt
    

    填写一下数据,注意是用 分隔的

    0007	zhangsan	18
    0008	lisi	25
    0009	wangwu	20
    

    保存后上传到HDFS上就行

    hdfs dfs -put user.txt /hbase/input
    

    步骤

    一、创建maven工程,导入jar包

    <repositories>
            <repository>
                <id>cloudera</id>
                <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
            </repository>
        </repositories>
    
        <dependencies>
    
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.6.0-mr1-cdh5.14.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>1.2.0-cdh5.14.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-server</artifactId>
                <version>1.2.0-cdh5.14.0</version>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.testng</groupId>
                <artifactId>testng</artifactId>
                <version>6.14.3</version>
                <scope>test</scope>
            </dependency>
    
    
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <encoding>UTF-8</encoding>
                        <!--    <verbal>true</verbal>-->
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.2</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*/RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    

    二、开发MapReduce程序

    定义一个Main方法类——HdfsReadHbaseWrite

    package cn.itcast.mr.demo2;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class HdfsReadHbaseWrite extends Configured implements Tool {
        @Override
        public int run(String[] args) throws Exception {
            //获取Job对象
            Job job = Job.getInstance(super.getConf(), "hdfs->hbase");
            //获取输入数据和路径
            job.setInputFormatClass(TextInputFormat.class);
            TextInputFormat.setInputPaths(job, new Path("hdfs://node01:8020/hbase/input"));
    
            //自定义Map逻辑
            job.setMapperClass(HDFSReadMapper.class);
            //获取k2,v2输出类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(NullWritable.class);
    
            //自定义Reduce逻辑
            TableMapReduceUtil.initTableReducerJob("myuser2", HbaseWriteReducer.class, job);
    
            //设置reduceTask个数
            job.setNumReduceTasks(1);
    
            //提交任务
            boolean b = job.waitForCompletion(true);
            return b ? 0 : 1;
        }
    
        public static void main(String[] args) throws Exception {
            Configuration configuration = HBaseConfiguration.create();
            configuration.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181");
            int run = ToolRunner.run(configuration, new HdfsReadHbaseWrite(), args);
            System.exit(run);
        }
    }
    

    自定义Map逻辑,定义一个Mapper类——HDFSReadMapper

    package cn.itcast.mr.demo2;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class HDFSReadMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            /*
            0007	zhangsan	18
            0008	lisi	25
            0009	wangwu	20
            我们要读取的数据都直接封装到了value中,所以直接拿到以后输出就行
             */
            context.write(value, NullWritable.get());
        }
    }
    

    自定义Reduce逻辑,定义一个Reducer类——HbaseWriteReducer

    package cn.itcast.mr.demo2;
    
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    
    import java.io.IOException;
    
    public class HbaseWriteReducer extends TableReducer<Text, NullWritable, ImmutableBytesWritable> {
        @Override
        protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            /*
            0007	zhangsan	18
            0008	lisi	25
            0009	wangwu	20
             */
            //先把拿到的数据分割一下
            String[] split = key.toString().split("	");
            //拿到rowKey
            byte[] rowKey = split[0].getBytes();
            //拿到nameValue
            byte[] nameValue = split[1].getBytes();
            //拿到ageValue
            byte[] ageValue = split[2].getBytes();
            //创建put对象
            Put put = new Put(rowKey);
            //添加数据
            put.addColumn("f1".getBytes(), "name".getBytes(), nameValue);
            put.addColumn("f1".getBytes(), "age".getBytes(), ageValue);
    
            //构建ImmutableBytesWritable
            ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable();
            immutableBytesWritable.set(rowKey);
            //转换成k3,v3输出
            context.write(immutableBytesWritable, put);
        }
    }
    

    三、结果

    在这里插入图片描述

  • 相关阅读:
    centos7安装es6.4.0
    将mysql数据同步到ES6.4(全量+增量)
    c#基于supersocket的简单websocket服务端收发消息实现
    c#log4net简单好用的配置
    MongoDB安装配置教程
    IntelliJ IDEA 中创建maven项目
    VMware Workstation 的安装和使用
    Redis使用场景
    Redis 下载安装
    MySQL--启动和关闭MySQL服务
  • 原文地址:https://www.cnblogs.com/zzzsw0412/p/12772428.html
Copyright © 2011-2022 走看看