zoukankan      html  css  js  c++  java
  • HDFS 工具类

    读取HDFS上文件数据

    import java.io.File;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.io.StringWriter;
    import java.net.URI;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.commons.io.IOUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.CompressionCodecFactory;
    import org.apache.hadoop.util.Progressable;
    /**
     * @author 作者 E-mail:
     * @version 创建时间:2016年3月8日 上午9:37:49 类说明
     * 读取hdfs文件数据
     */
    public class ReadHDFSDatas {
    
        static Configuration conf = new Configuration();
        /**
         * 
         * 
         * @param location
         * @param conf
         * @return
         * @throws Exception
         */
        public static List<String> readLines( Path location, Configuration conf )
            throws Exception {
            // StringBuffer sb = new StringBuffer();
            FileSystem fileSystem = FileSystem.get( location.toUri(), conf );
            CompressionCodecFactory factory = new CompressionCodecFactory( conf );
            FileStatus[] items = fileSystem.listStatus( location );
            if ( items == null )
                return new ArrayList<String>();
            List<String> results = new ArrayList<String>();
            for ( FileStatus item : items ) {
    
                // ignoring files like _SUCCESS
                if ( item.getPath().getName().startsWith( "_" ) ) {
                    continue;
                }
                CompressionCodec codec = factory.getCodec( item.getPath() );
                InputStream stream = null;
                
                if ( codec != null ) {
                    stream = codec.createInputStream( fileSystem.open( item.getPath() ) );
                }
                else {
                    stream = fileSystem.open( item.getPath() );
                }
    
                StringWriter writer = new StringWriter();
                IOUtils.copy( stream, writer, "UTF-8" );
                String raw = writer.toString();
                // String[] resulting = raw.split( "
    " );
                for ( String str : raw.split( "	" ) ) {
                    results.add( str );
                    System.out.println( "start..." + results + "....." );
                }
            }
            return results;
        }
    
       
    
        public String ReadFile( String hdfs )
            throws IOException {
            StringBuffer sb = new StringBuffer();
            FileSystem fs = FileSystem.get( URI.create( hdfs ), conf );
            FSDataInputStream hdfsInStream = fs.open( new Path( hdfs ) );
            try {
                fs = FileSystem.get( conf );
                hdfsInStream = fs.open( new Path( hdfs ) );
                byte[] b = new byte[10240];
                int numBytes = 0;
                // Windows os error
                while ( ( numBytes = hdfsInStream.read( b ) ) > 0 ) {
                    numBytes = hdfsInStream.read( b );
    
                }
    
            }
            catch ( IOException e ) {
    
                e.printStackTrace();
            }
            hdfsInStream.close();
            fs.close();
            return sb.toString();
        }
    
        /**
         * 
         * @param filePath
         * @return
         * @throws IOException
         */
        public static String getFile( String filePath ) throws IOException {
            String line = "";
            try {
                Configuration conf = new Configuration();
                FileSystem fs = FileSystem.get( URI.create( filePath ), conf );
                Path pathq = new Path( filePath );
                FSDataInputStream fsr = fs.open( pathq );
                
                while ( line != null ) {
                    line = fsr.readLine();
                    if ( line != null ) {
                        System.out.println( line );
                    }
                }
             
            }
            catch ( Exception e ) {
                e.printStackTrace();
            }
            return line;
        }
    
        /*
         * 
         */
        public static List<String> getDatas( String filePath )  {
           List<String> list = new ArrayList<String>();
           
            try {
                Configuration conf = new Configuration();
                FileSystem fs = FileSystem.get( URI.create( filePath ), conf );
                Path pathq = new Path( filePath );
                FSDataInputStream fsr = fs.open( pathq );
                String line ="";
                while ( line != null ) {
                    line = fsr.readLine();
                    if ( line != null ) {
                       
                        list.add( line );
                    }
                }
            }
            catch ( Exception e ) {
                e.printStackTrace();
            }
            return list;
        }
        public static void main( String[] args ){
            //String hdfs = "hdfs://node4:9000/hive/warehouse/u_data/u.data";
            //String  hdfs = "/datas/t1";
            String  hdfs = "/datas/u.data";
            Path path = new Path( hdfs );
            // String hdfs = "/datas";
            // String hdfs = "/hive/warehouse/u_data/u.data";
          //  getFile(hdfs);
            /**
             * userid INT,
            movieid INT,
            rating INT,
            weekday INT)
    
             */
            List<String> listDatas = getDatas(hdfs);
            for (int i = 0; i < listDatas.size(); i++){
                    String[] split = listDatas.get(i).split("	");
                    String userid = split[0];
                    String movieid = split[1];
                    String rating = split[2];
                    String weekday = split[3];
                    String makeRowKey = RegionSeverSplit.makeRowKey(userid); 
             // 用put API实现批量入库
    //System.out.println("userid--"+ userid + ".."+ "movieid--"+ movieid + ".." +"rating--"+ rating + ".."+"weekday--"+ weekday + "...."); HBaseUtils.addRows("t1", makeRowKey, "f1", "weekday-rating", (movieid+"-"+rating+"-"+weekday).getBytes()); } System.out.println("success......"); } }

    HBase 随机生成rowkey 前置处理

    import java.security.MessageDigest;
    import java.security.NoSuchAlgorithmException;
    
    import org.apache.commons.codec.binary.Hex;
    
    public class RegionSeverSplit {
    
        
        public  static String makeRowKey(String id){
             String md5_content = null;
                try {
                    MessageDigest messageDigest = MessageDigest.getInstance("MD5");
                    messageDigest.reset();
                    messageDigest.update(id.getBytes());
                    byte[] bytes = messageDigest.digest();
                    md5_content = new String(Hex.encodeHex(bytes));
                } catch (NoSuchAlgorithmException e1) {
                    e1.printStackTrace();
                }
                //turn right md5
                String right_md5_id = Integer.toHexString(Integer.parseInt(md5_content.substring(0,7),16)>>1);
                while(right_md5_id.length()<7){
                    right_md5_id = "0" + right_md5_id;
                }
                return right_md5_id + "-" + id;
        }
        public static void main(String[] args){
            String rowky = makeRowKey("asdfasdf");
            System.out.println(rowky);
        }
    }

    HBase Util工具类,用put方式批量或者单条数据入库

    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.HBaseAdmin;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.HTableInterface;
    import org.apache.hadoop.hbase.client.HTablePool;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.mapreduce.InputSplit;
    
    import cn.tansun.bd.hdfs.ReadHDFSDatas;
    
    /**
     * 
     * @author root
     *
     */
    
    public class HBaseUtils {
        private static HBaseAdmin hadmin = null;
        private static Configuration conf;
        private static HTable htable = null;
    
        static {
            conf = new Configuration();
            String filePath = "hbase-site.xml";
            Path path = new Path(filePath);
            conf.addResource(path);
            conf = HBaseConfiguration.create(conf);
        }
    
        /**
         * insert one row
         * 
         * @param tableName
         * @param rowkey
         * @param columnFinaly
         * @param columnName
         * @param values
         * @return
         */
        public static boolean addRow(String tableName, String rowkey,
                String columnFinaly, String columnName, byte[] values) {
            boolean flag = true;
            if (tableName != null) {
                HTablePool hTpool = new HTablePool(conf, 1000);
                HTableInterface table = hTpool.getTable(tableName);
                Put put = new Put(rowkey.getBytes());
                put.addColumn(columnFinaly.getBytes(), columnName.getBytes(),
                        values);
                try {
                    table.put(put);
                    System.out.print("addRow success..." + "tableName....."
                            + tableName);
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            } else {
                System.out.println("  please select tableName");
            }
    
            return flag;
        }
    
        public static void main(String[] args) {
            /*String makeRowKey = RegionSeverSplit.makeRowKey("adcdfef");
            String tableName = "student";
            String columnfianly = "info";
            String columnName = "name";
            String values = "zhangsan";
            addRow(tableName, makeRowKey, columnfianly, columnName,
                    values.getBytes());*/
            ReadHDFSDatas readh = new ReadHDFSDatas();
            String hdfs = "/datas/u.data";
            List<String> getDatas = readh.getDatas(hdfs);
            for (int i = 0; i < getDatas.size(); i++){
                if (i < 100){
                    System.out.println(getDatas.get(i));
                }
            }
        }
    
        /**
         * put many rows
         * 
         * @param tableName
         * @param rowkey
         * @param columnFinaly
         * @param columnName
         * @param values
         * @return
         */
        public static List<Put> addRows(String tableName, String rowkey,
                String columnFinaly, String columnName, byte[] values) {
            List<Put> lists  = null;
            long start = System.currentTimeMillis();
            if (tableName != null || rowkey != null) {
                HTablePool hTablePool = new HTablePool(conf, 1000); 
                HTableInterface table = hTablePool.getTable(tableName);
                try {
                    table.setAutoFlush(false);
                    table.setWriteBufferSize(1024 * 1024 * 1);
                    lists = new ArrayList<Put>();
                    Random random = new Random();
                    byte[] buffers = new byte[256];
                    int count = 100;
                    for (int i = 0; i < count; i++){
                        Put put = new Put(rowkey.getBytes());
                        random.nextBytes(buffers);
                        put.add(columnFinaly.getBytes(), columnName.toString().getBytes(), values);
                        put.getDurability();
                        //table.setAutoFlush(false);
                        if ( i % 100 == 0){
                                
                            lists.add(put);
                            try {
                                table.batch(lists);
                            } catch (InterruptedException e) {
                                System.out.println("error......");
                                e.printStackTrace();
                            }
                            table.put(lists);
                            lists.clear();
                            table.flushCommits();
                        }
                    }
                } catch (IOException e) {
    
                    e.printStackTrace();
                }
                
    
            } else {
                System.out.println("..tableName  not null");
            }
            long end = System.currentTimeMillis();
            long times = end - start;
            System.out.println(times * 1.0 / 1000 +"..... finsh........"  );
            return lists;
        }
    
        /**
         * read datas by fileName
         * @param fileName
         * @return
         */
        public List<String> getFileDatas(String fileName){
            
            return null;
        } 
        
        /**
         * read hdfs datas by fileName
         * @param fileName
         * @return
         */
        public static List<String> getHdfsDatas(String fileName){
            
        /*    List<String> getDatas = ReadHDFSDatas.getDatas(fileName);
            for (int i = 0; i < getDatas.size(); i++){
                if (i < 100){
                    System.out.println(getDatas.get(i));
                }
            }
            return getDatas;*/
            return null;
        }
        /**
         * 
         * @param startKey
         * @param endKey
         * @return
         */
        public List<InputSplit> getSplits(byte[] startKey, byte[] endKey) {
            return null;
        }
    }
  • 相关阅读:
    HDU 5486 Difference of Clustering 图论
    HDU 5481 Desiderium 动态规划
    hdu 5480 Conturbatio 线段树 单点更新,区间查询最小值
    HDU 5478 Can you find it 随机化 数学
    HDU 5477 A Sweet Journey 水题
    HDU 5476 Explore Track of Point 数学平几
    HDU 5475 An easy problem 线段树
    ZOJ 3829 Known Notation 贪心
    ZOJ 3827 Information Entropy 水题
    zoj 3823 Excavator Contest 构造
  • 原文地址:https://www.cnblogs.com/zhanggl/p/5584015.html
Copyright © 2011-2022 走看看