zoukankan      html  css  js  c++  java
  • HDFS 工具类

    读取HDFS上文件数据

    import java.io.File;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.io.StringWriter;
    import java.net.URI;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.commons.io.IOUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.CompressionCodecFactory;
    import org.apache.hadoop.util.Progressable;
    /**
     * @author 作者 E-mail:
     * @version 创建时间:2016年3月8日 上午9:37:49 类说明
     * 读取hdfs文件数据
     */
    public class ReadHDFSDatas {
    
        static Configuration conf = new Configuration();
        /**
         * 
         * 
         * @param location
         * @param conf
         * @return
         * @throws Exception
         */
        public static List<String> readLines( Path location, Configuration conf )
            throws Exception {
            // StringBuffer sb = new StringBuffer();
            FileSystem fileSystem = FileSystem.get( location.toUri(), conf );
            CompressionCodecFactory factory = new CompressionCodecFactory( conf );
            FileStatus[] items = fileSystem.listStatus( location );
            if ( items == null )
                return new ArrayList<String>();
            List<String> results = new ArrayList<String>();
            for ( FileStatus item : items ) {
    
                // ignoring files like _SUCCESS
                if ( item.getPath().getName().startsWith( "_" ) ) {
                    continue;
                }
                CompressionCodec codec = factory.getCodec( item.getPath() );
                InputStream stream = null;
                
                if ( codec != null ) {
                    stream = codec.createInputStream( fileSystem.open( item.getPath() ) );
                }
                else {
                    stream = fileSystem.open( item.getPath() );
                }
    
                StringWriter writer = new StringWriter();
                IOUtils.copy( stream, writer, "UTF-8" );
                String raw = writer.toString();
                // String[] resulting = raw.split( "
    " );
                for ( String str : raw.split( "	" ) ) {
                    results.add( str );
                    System.out.println( "start..." + results + "....." );
                }
            }
            return results;
        }
    
       
    
        public String ReadFile( String hdfs )
            throws IOException {
            StringBuffer sb = new StringBuffer();
            FileSystem fs = FileSystem.get( URI.create( hdfs ), conf );
            FSDataInputStream hdfsInStream = fs.open( new Path( hdfs ) );
            try {
                fs = FileSystem.get( conf );
                hdfsInStream = fs.open( new Path( hdfs ) );
                byte[] b = new byte[10240];
                int numBytes = 0;
                // Windows os error
                while ( ( numBytes = hdfsInStream.read( b ) ) > 0 ) {
                    numBytes = hdfsInStream.read( b );
    
                }
    
            }
            catch ( IOException e ) {
    
                e.printStackTrace();
            }
            hdfsInStream.close();
            fs.close();
            return sb.toString();
        }
    
        /**
         * 
         * @param filePath
         * @return
         * @throws IOException
         */
        public static String getFile( String filePath ) throws IOException {
            String line = "";
            try {
                Configuration conf = new Configuration();
                FileSystem fs = FileSystem.get( URI.create( filePath ), conf );
                Path pathq = new Path( filePath );
                FSDataInputStream fsr = fs.open( pathq );
                
                while ( line != null ) {
                    line = fsr.readLine();
                    if ( line != null ) {
                        System.out.println( line );
                    }
                }
             
            }
            catch ( Exception e ) {
                e.printStackTrace();
            }
            return line;
        }
    
        /*
         * 
         */
        public static List<String> getDatas( String filePath )  {
           List<String> list = new ArrayList<String>();
           
            try {
                Configuration conf = new Configuration();
                FileSystem fs = FileSystem.get( URI.create( filePath ), conf );
                Path pathq = new Path( filePath );
                FSDataInputStream fsr = fs.open( pathq );
                String line ="";
                while ( line != null ) {
                    line = fsr.readLine();
                    if ( line != null ) {
                       
                        list.add( line );
                    }
                }
            }
            catch ( Exception e ) {
                e.printStackTrace();
            }
            return list;
        }
        public static void main( String[] args ){
            //String hdfs = "hdfs://node4:9000/hive/warehouse/u_data/u.data";
            //String  hdfs = "/datas/t1";
            String  hdfs = "/datas/u.data";
            Path path = new Path( hdfs );
            // String hdfs = "/datas";
            // String hdfs = "/hive/warehouse/u_data/u.data";
          //  getFile(hdfs);
            /**
             * userid INT,
            movieid INT,
            rating INT,
            weekday INT)
    
             */
            List<String> listDatas = getDatas(hdfs);
            for (int i = 0; i < listDatas.size(); i++){
                    String[] split = listDatas.get(i).split("	");
                    String userid = split[0];
                    String movieid = split[1];
                    String rating = split[2];
                    String weekday = split[3];
                    String makeRowKey = RegionSeverSplit.makeRowKey(userid); 
             // 用put API实现批量入库
    //System.out.println("userid--"+ userid + ".."+ "movieid--"+ movieid + ".." +"rating--"+ rating + ".."+"weekday--"+ weekday + "...."); HBaseUtils.addRows("t1", makeRowKey, "f1", "weekday-rating", (movieid+"-"+rating+"-"+weekday).getBytes()); } System.out.println("success......"); } }

    HBase 随机生成rowkey 前置处理

    import java.security.MessageDigest;
    import java.security.NoSuchAlgorithmException;
    
    import org.apache.commons.codec.binary.Hex;
    
    public class RegionSeverSplit {
    
        
        public  static String makeRowKey(String id){
             String md5_content = null;
                try {
                    MessageDigest messageDigest = MessageDigest.getInstance("MD5");
                    messageDigest.reset();
                    messageDigest.update(id.getBytes());
                    byte[] bytes = messageDigest.digest();
                    md5_content = new String(Hex.encodeHex(bytes));
                } catch (NoSuchAlgorithmException e1) {
                    e1.printStackTrace();
                }
                //turn right md5
                String right_md5_id = Integer.toHexString(Integer.parseInt(md5_content.substring(0,7),16)>>1);
                while(right_md5_id.length()<7){
                    right_md5_id = "0" + right_md5_id;
                }
                return right_md5_id + "-" + id;
        }
        public static void main(String[] args){
            String rowky = makeRowKey("asdfasdf");
            System.out.println(rowky);
        }
    }

    HBase Util工具类,用put方式批量或者单条数据入库

    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.HBaseAdmin;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.HTableInterface;
    import org.apache.hadoop.hbase.client.HTablePool;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.mapreduce.InputSplit;
    
    import cn.tansun.bd.hdfs.ReadHDFSDatas;
    
    /**
     * 
     * @author root
     *
     */
    
    public class HBaseUtils {
        private static HBaseAdmin hadmin = null;
        private static Configuration conf;
        private static HTable htable = null;
    
        static {
            conf = new Configuration();
            String filePath = "hbase-site.xml";
            Path path = new Path(filePath);
            conf.addResource(path);
            conf = HBaseConfiguration.create(conf);
        }
    
        /**
         * insert one row
         * 
         * @param tableName
         * @param rowkey
         * @param columnFinaly
         * @param columnName
         * @param values
         * @return
         */
        public static boolean addRow(String tableName, String rowkey,
                String columnFinaly, String columnName, byte[] values) {
            boolean flag = true;
            if (tableName != null) {
                HTablePool hTpool = new HTablePool(conf, 1000);
                HTableInterface table = hTpool.getTable(tableName);
                Put put = new Put(rowkey.getBytes());
                put.addColumn(columnFinaly.getBytes(), columnName.getBytes(),
                        values);
                try {
                    table.put(put);
                    System.out.print("addRow success..." + "tableName....."
                            + tableName);
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            } else {
                System.out.println("  please select tableName");
            }
    
            return flag;
        }
    
        public static void main(String[] args) {
            /*String makeRowKey = RegionSeverSplit.makeRowKey("adcdfef");
            String tableName = "student";
            String columnfianly = "info";
            String columnName = "name";
            String values = "zhangsan";
            addRow(tableName, makeRowKey, columnfianly, columnName,
                    values.getBytes());*/
            ReadHDFSDatas readh = new ReadHDFSDatas();
            String hdfs = "/datas/u.data";
            List<String> getDatas = readh.getDatas(hdfs);
            for (int i = 0; i < getDatas.size(); i++){
                if (i < 100){
                    System.out.println(getDatas.get(i));
                }
            }
        }
    
        /**
         * put many rows
         * 
         * @param tableName
         * @param rowkey
         * @param columnFinaly
         * @param columnName
         * @param values
         * @return
         */
        public static List<Put> addRows(String tableName, String rowkey,
                String columnFinaly, String columnName, byte[] values) {
            List<Put> lists  = null;
            long start = System.currentTimeMillis();
            if (tableName != null || rowkey != null) {
                HTablePool hTablePool = new HTablePool(conf, 1000); 
                HTableInterface table = hTablePool.getTable(tableName);
                try {
                    table.setAutoFlush(false);
                    table.setWriteBufferSize(1024 * 1024 * 1);
                    lists = new ArrayList<Put>();
                    Random random = new Random();
                    byte[] buffers = new byte[256];
                    int count = 100;
                    for (int i = 0; i < count; i++){
                        Put put = new Put(rowkey.getBytes());
                        random.nextBytes(buffers);
                        put.add(columnFinaly.getBytes(), columnName.toString().getBytes(), values);
                        put.getDurability();
                        //table.setAutoFlush(false);
                        if ( i % 100 == 0){
                                
                            lists.add(put);
                            try {
                                table.batch(lists);
                            } catch (InterruptedException e) {
                                System.out.println("error......");
                                e.printStackTrace();
                            }
                            table.put(lists);
                            lists.clear();
                            table.flushCommits();
                        }
                    }
                } catch (IOException e) {
    
                    e.printStackTrace();
                }
                
    
            } else {
                System.out.println("..tableName  not null");
            }
            long end = System.currentTimeMillis();
            long times = end - start;
            System.out.println(times * 1.0 / 1000 +"..... finsh........"  );
            return lists;
        }
    
        /**
         * read datas by fileName
         * @param fileName
         * @return
         */
        public List<String> getFileDatas(String fileName){
            
            return null;
        } 
        
        /**
         * read hdfs datas by fileName
         * @param fileName
         * @return
         */
        public static List<String> getHdfsDatas(String fileName){
            
        /*    List<String> getDatas = ReadHDFSDatas.getDatas(fileName);
            for (int i = 0; i < getDatas.size(); i++){
                if (i < 100){
                    System.out.println(getDatas.get(i));
                }
            }
            return getDatas;*/
            return null;
        }
        /**
         * 
         * @param startKey
         * @param endKey
         * @return
         */
        public List<InputSplit> getSplits(byte[] startKey, byte[] endKey) {
            return null;
        }
    }
  • 相关阅读:
    怎么样把网站logo(小图标)在地址栏里显示
    PHP 做群发短信(短信接口连接问题)
    网页JS弹出广告代码,头部,右下角,网页中漂浮,对联广告代码等大全
    PHP 时间戳与系统时间保持一致
    PHP 把数据表列出来的东西导出成execle格式
    数据库连接类 DB.class.php
    session判断页面是否已经登录的问题
    结合Smarty,生成HTML静态页
    PHP做文件下载功能
    滚动字幕,鼠标经过停留
  • 原文地址:https://www.cnblogs.com/zhanggl/p/5584015.html
Copyright © 2011-2022 走看看