zoukankan      html  css  js  c++  java
  • 在虚拟机用java实现hadoop的增删改查操作

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.*;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.util.Bytes;

    import java.io.BufferedReader;
    import java.io.BufferedWriter;
    import java.io.File;
    import java.io.FileOutputStream;
    import java.io.FileReader;
    import java.io.IOException;
    import java.io.OutputStreamWriter;
    import java.io.Writer;
    public class cunchu {
        
        public static Configuration configuration;
        public static Connection connection;
        public static Admin admin;
        public static void init(){
            configuration  = HBaseConfiguration.create();
            configuration.set("hbase.rootdir","hdfs://localhost:9000/hbase");
            try{
                connection = ConnectionFactory.createConnection(configuration);
                admin = connection.getAdmin();
            }catch (IOException e){
                e.printStackTrace();
            }
        }

        public static void close(){
            try{
                if(admin != null){
                    admin.close();
                }
                if(null != connection){
                    connection.close();
                }
            }catch (IOException e){
                e.printStackTrace();
            }
        }
        public static void main(String[] args)throws IOException{
            init();
            
            
            
            
            
             FileReader read = new FileReader("result2.txt");
             BufferedReader br = new BufferedReader(read);
             Writer writer = null;
             
             String row;
             int hang=1;
            String rowKey;
            String[] data=new String[6];
            try {
             while((row = br.readLine())!=null){
                     data=change(row);
                     
                     
                     row=data[0]+","+data[1]+","+data[2]+","+data[3]+","+data[4]+","+data[5];
                     rowKey=data[5];
                     System.out.println(rowKey);
                     Table table = connection.getTable(TableName.valueOf("result"));
                     Put put = new Put(rowKey.getBytes());
                     put.addColumn("ip".getBytes(),null,data[0].getBytes());
                     System.out.println("ok");
                     put.addColumn("time".getBytes(),null,data[1].getBytes());
                     put.addColumn("day".getBytes(),null,data[2].getBytes());
                     put.addColumn("traffic".getBytes(),null,data[3].getBytes());
                     put.addColumn("type".getBytes(),null,data[4].getBytes());
                     table.put(put);
                     
                     //i++;
                    }
         } catch (IOException e) {
             // TODO Auto-generated catch block
             e.printStackTrace();
         }

           
           
            close();
        }
     
        private static String [] change(String row) {
            char [] str1=row.toCharArray();
            String [] data =new String [6];    
            int j=0;
            int k=0;
            for(int i=0;i<str1.length;i++) {
                if(str1[i]==',') {
                    data[k]=row.substring(j, i);
                    j=i+1;
                    k++;
                }    
            }
            data[k]=row.substring(j, str1.length);
            return data;
        }
     
        public static void createTable(String myTableName,String[] colFamily) throws IOException {
            TableName tableName = TableName.valueOf(myTableName);
            if(admin.tableExists(tableName)){
                System.out.println("talbe is exists!");
                
            }else {
                TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tableName);
                for(String str:colFamily){
                    ColumnFamilyDescriptor family =
    ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(str)).build();
                    tableDescriptor.setColumnFamily(family);
                }
                admin.createTable(tableDescriptor.build());
            }
        }
        
        //添加
        public static void insertData(String tableName,String rowKey,String colFamily,String col,String val) throws IOException {
            Table table = connection.getTable(TableName.valueOf(tableName));
            Put put = new Put(rowKey.getBytes());
            put.addColumn(colFamily.getBytes(),col.getBytes(), val.getBytes());
            table.put(put);
            table.close();
        }
        
        //清空
        public static void disableTable(String tableName) throws IOException {
                try {
                    TableName table = TableName.valueOf(tableName);
                    admin.disableTable(table);
                } catch (MasterNotRunningException e) {
                    e.printStackTrace();
                } catch (ZooKeeperConnectionException e) {
                    e.printStackTrace();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            
        }
        
        //删除
        public static void deleteTable(String tableName) throws IOException {
            try {
                TableName table = TableName.valueOf(tableName);
     
                admin.deleteTable(table);
            } catch (MasterNotRunningException e) {
                e.printStackTrace();
            } catch (ZooKeeperConnectionException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        
        }
        //获值
        public static void getData(String tableName,String rowKey,String colFamily, String col)throws  IOException{
            Table table = connection.getTable(TableName.valueOf(tableName));
            Get get = new Get(rowKey.getBytes());
            get.addColumn(colFamily.getBytes(),col.getBytes());
            Result result = table.get(get);
            System.out.println(new String(result.getValue(colFamily.getBytes(),col==null?null:col.getBytes())));
            table.close();
        }
        
        //删行
        public static void deleteRow(String tableName,String rowKey) throws IOException {
            Table table = connection.getTable(TableName.valueOf(tableName));
            Delete delete = new Delete(Bytes.toBytes(rowKey));
            table.delete(delete);
            table.close();
        }
        
        
        
        
    }

  • 相关阅读:
    idea设置全局ignore
    win 2012 安装mysql 5.7.20 及报错 This application requires Visual Studio 2013 Redistributable. Please ins
    win 2012 安装mysql 5.7.20 及报错 This application requires Visual Studio 2013 Redistr
    kafka 删除 topic
    java编译中出现了Exception in thread “main" java.lang.UnsupportedClassVersionError
    Centos中使用yum安装java时,没有jps的问题的解决
    Spring 整合Junit
    Spring纯注解配置
    Spring 基于注解的 IOC 配置
    打印java系统的信息
  • 原文地址:https://www.cnblogs.com/ljpljm/p/13929887.html
Copyright © 2011-2022 走看看