zoukankan      html  css  js  c++  java
  • TXT格式文件插入Hbase

    将.txt格式文件插入Hbase中,代码如下

    package addHbase;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Table;
    import org.apache.hadoop.hbase.util.Bytes;
    
    import java.io.*;
    /**
     * @author mooojl
     * @Date 2020-11-05-11:25
     */
    public class Add {
        public static void main(String[] args) throws IOException {
            String fileName = "D:\Softwares\Major\IDEA\Projects\HadoopProjects\hbase_test02\src\main\java\addHbase\result.txt";
            File file = new File(fileName);
            FileInputStream fis = null;
            fis = new FileInputStream(file);
            InputStreamReader input = new InputStreamReader(fis);
            BufferedReader br = new BufferedReader(input);
            String line = null;
            String[] info = null;
    
    
            //hbase
            Connection connection;
            TableName TABLE_NAME= TableName.valueOf("bigdatatest");
            Configuration configuration = HBaseConfiguration.create();
            connection = ConnectionFactory.createConnection(configuration);
            System.out.println(connection);
            Table table = connection.getTable(TABLE_NAME);
            String columnFamily="C1";
            String columnName1="ip";
            String columnName2="time";
            String columnName3="day";
            String columnName4="traffic";
            String columnName5="type";
            String columnName6="id";
            int i=1;
            while ((line = br.readLine())!=null){
                info = line.split(",");
                Put put = new Put(Bytes.toBytes(i+""));
                put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName1 ),Bytes.toBytes(info[0]));
                put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName2 ),Bytes.toBytes(info[1]));
                put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName3 ),Bytes.toBytes(info[2]));
                put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName4 ),Bytes.toBytes(info[3]));
                put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName5 ),Bytes.toBytes(info[4]));
                put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName6 ),Bytes.toBytes(info[5]));
                table.put(put);
                i++;
            }
            table.close();
            connection.close();
        }
    }
  • 相关阅读:
    [Python 多线程] Concurrent (十五)
    [Python 多线程] GIL全局解释器锁 (十三)
    [Python 多线程] Semaphore、BounedeSemaphore (十二)
    自定义HandlerMethodArgumentResolver参数解析器和源码分析
    Dubbo分布式日志追踪
    kafka的分区分配策略
    深入剖析kafka架构内部原理
    kafka partition(分区)与 group
    Kafka消费组(consumer group)
    kafka存储机制以及offset
  • 原文地址:https://www.cnblogs.com/MoooJL/p/13934544.html
Copyright © 2011-2022 走看看