zoukankan      html  css  js  c++  java
  • 将文件以API方式导入到HBase(小文件处理)

    前面我们搭建了Hadoop及HBase本地开发环境,(参见前文:Win7 64位系统上Hadoop单机模式的安装及开发环境搭建Win7 64位系统上HBase单机模式的安装)多数情况下,对于MapReduce模型,其底层使用HDFS作为存储,在HDFS中,其NameNode节点保存的是文件的元数据,DataNode保存真正的文件数据,如果是大型数据文件,这没有任何问题,但如果对于大量的小文件,其不足也是很明显的,NameNode中因为大量的小文件的元数据,占用大量的内存空间,将使得HDFS性能受到很大影响,一个可行的方案是,将大量小文件保存到HBase中,本文及后续文章讨论的是以HBase作为存储,如何将小文件或大文件导入到HBase中去,每个小文件将保存在独立的cell中,而对于大文件,将先上传到HDFS中,然后再使用MapReduce方法,将文件内容读入到HBase中。

    场景描述

    对于小文件,我这里假设不足10M,这样我们就不需要对文件split并保存到不同的cell中,在HBase中,每一个行与列的交汇处,称为一个cell,其默认上限是10M,当然这个是可以通过配置文件调整的,调整的配置项是 “hbase.client.keyvalue.maxsize”,其默认值是10485760。对于文件源,可以是本地的文件,本测试用例中使用的是本地的email文件,大小才15k左右,我们将创建一个本地Java工程,读取本地文件后,再通过API保存到HBase中。另外一个可能的场景是,将本地程序变换为一个RESTful API,外部系统远程调用这个RESTful API,将数据存到HBase中,通过这个API,可以串起2个独立的系统。

    项目步骤:

    1:在IDEA中创建一个Java Maven工程

    2: 修改pom.xml,引入hbase 1.2.6,因为要使用API操作HBase

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>ImportFile</groupId>
        <artifactId>ImportFile</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <repositories>
            <repository>
                <id>apache</id>
                <url>http://maven.apache.org</url>
            </repository>
        </repositories>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>1.2.6</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <executions>
                        <!-- Run shade goal on package phase -->
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <!-- Do not copy the signatures in the META-INF folder.
                                        Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
    
                                <createDependencyReducedPom>false</createDependencyReducedPom>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
    
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
    
            </plugins>
        </build>
    </project>

    3:添加处理HBase的类HbaseUtility,分别包含表的创建,添加,删除,查询数据的方法

    package examples;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.*;
    
    import java.io.IOException;
    
    
    public class HBaseUtility {
        static String TABLE_NAME = "email";
        static String[] COLUMN_FAMILY = {"cf1"};
        static String[] COLUMNS = {"message"};
    
        public static void HBaseOperation(String fileContent) {
            //*******************************************************//
            Connection conn = null;
            Admin admin = null;
            try {
                Configuration conf = HBaseConfiguration.create();
                conn = ConnectionFactory.createConnection(conf);
                admin = conn.getAdmin();
    
                TableName tableName = TableName.valueOf(TABLE_NAME);
                CreateTable(admin,tableName);
                DeleteData(conn,tableName);
                PutData(conn, tableName, fileContent);
                GetData(conn, tableName);
            }
            catch (IOException e)
            {
                e.printStackTrace();
            }
            finally
            {
                try {
                    if (admin != null) {
                        admin.close();
                    }
    
                    if (conn != null) {
                        conn.close();
                    }
                }
                catch (IOException e)
                {
                    e.printStackTrace();
                }
            }
        }
    }

    在上面类中添加HBase的操作函数:

        //创建hbase表
        private static void CreateTable(Admin admin, TableName tableName)
        {
            try {
                if (admin.tableExists(tableName)) {
                    System.out.println(tableName + " table already exists!");
                } else {
                    HTableDescriptor tableDesc = new HTableDescriptor(tableName);
                    for (String column : COLUMN_FAMILY) {
                        tableDesc.addFamily(new HColumnDescriptor(column));
                    }
                    admin.createTable(tableDesc);
                    System.out.println(tableName + " is created successfully!");
                }
            }
            catch (IOException e)
            {
                e.printStackTrace();
            }
    
        }
    
        //保存数据
        private static void PutData(Connection conn, TableName tableName, String rowKey, String fileContent) {
            System.out.println("PUT value..............................................");
            try
            {
                Table table = conn.getTable(tableName);
                Put put = new Put (rowKey.getBytes());
                put.addColumn(COLUMN_FAMILY[0].getBytes(), COLUMNS[0].getBytes(), fileContent.getBytes());
                table.put(put);
                table.close();
            }
            catch (IOException e)
            {
                e.printStackTrace();
            }
        }
    
        //查询数据
        private static void GetData(Connection conn, TableName tableName, String rowKey) {
            System.out.println("GET value..............................................");
            try
            {
                Table table = conn.getTable(tableName);
                Get get = new Get(rowKey.getBytes());
                //get.addFamily(COLUMN_FAMILY_NAME.getBytes());
                //get.addColumn(COLUMN_FAMILY_NAME.getBytes(),COLUMNS[0].getBytes());
                Result result = table.get(get);
                // get column family
                result.getFamilyMap(COLUMN_FAMILY[0].getBytes()).forEach((k,v) -> System.out.println(new String(k) + ":" + new String(v)));
                table.close();
            }
            catch (IOException e)
            {
                e.printStackTrace();
            }
        }
    
        //删除数据
        private static void DeleteData(Connection conn, TableName tableName)
        {
            System.out.println("DELETE value..............................................");
            String rowKey = "row1";
            try
            {
                Table table = conn.getTable(tableName);
                Delete delete = new Delete(rowKey.getBytes());
                delete.addColumn(COLUMN_FAMILY[0].getBytes(),COLUMNS[0].getBytes());
                table.delete(delete);
                table.close();
            }
            catch (IOException e)
            {
                e.printStackTrace();
            }
        }

    4:添加主程序入口,读取本地文件,并调用HBaseUtility方法

    package examples;
    
    import java.io.File;
    import java.io.FileReader;
    import java.io.FileNotFoundException;
    import java.io.IOException;
    
    public class ImportFile {
        public static void main (String[] args)
        {
            File file = new File("D:\MyEmail\Test email successful.eml");
            String fileContent = ReadFile(file);
            HBaseUtility.HBaseOperation(fileContent);
        }
    
        private static String ReadFile(File file) {
            FileReader fr = null;
            String fileContent = null;
            try {
                StringBuffer sb = new StringBuffer();
                fr = new FileReader(file);
                // 定义字符数组
                char[] buf = new char[1024];// 缓冲区大小
                int len = 0;// 长度读取的字符个数
                while ((len = fr.read(buf)) != -1) {
                    System.out.println(new String(buf, 0, len));
                    sb.append(buf,0,len);
                }
                fileContent = new String(sb);
                System.out.println(fileContent);
    
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                if (fr != null) {
                    try {
                        fr.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
    
            return fileContent;
        }
    }

    5:测试

    启动Hadoop,Hbase

    cd D:Applicationhadoop-2.7.4sbin
    start-all.cmd
    
    cd D:Applicationhbase-1.2.6in
    start-hbase.cmd
    
    hbase shell

    将可以看到以下的命令行窗口

    现在可以在IDEA中设置断点,点击调试按钮开始调试了:

     

  • 相关阅读:
    每日leetcode-数组-387. 字符串中的第一个唯一字符
    每日leetcode-数组-541. 反转字符串 II
    每日leetcode-数组-344. 反转字符串
    每日leetcode-数组-58. 最后一个单词的长度
    每日leetcode-数组-434. 字符串中的单词数
    每日leetcode-数组-14. 最长公共前缀
    每日leetcode-数组-125. 验证回文串
    每日leetcode-数组-520. 检测大写字母
    Weblogic漏洞挖矿病毒解决方法
    C盘空间不足清理
  • 原文地址:https://www.cnblogs.com/benfly/p/8359825.html
Copyright © 2011-2022 走看看