zoukankan      html  css  js  c++  java
  • Spark 数据处理相关代码

      <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.11</artifactId>
          <version>2.3.3</version>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
        <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
          <version>1.7.26</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.11</artifactId>
          <version>2.3.3</version>
        </dependency>
    
        <!--<dependency>-->
          <!--<groupId>org.apache.spark</groupId>-->
          <!--<artifactId>spark-hive_2.11</artifactId>-->
          <!--<version>2.3.3</version>-->
        <!--</dependency>-->
    
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming-kafka_2.11</artifactId>
          <version>1.6.3</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-client</artifactId>
          <version>2.7.2</version>
        </dependency>
        <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>5.1.38</version>
        </dependency>
    
        <dependency>
          <groupId>com.typesafe</groupId>
          <artifactId>config</artifactId>
          <version>1.3.1</version>
        </dependency>
    
        <dependency>
          <groupId>redis.clients</groupId>
          <artifactId>jedis</artifactId>
          <version>2.9.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.json/json -->
        <dependency>
          <groupId>org.json</groupId>
          <artifactId>json</artifactId>
          <version>20180813</version>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
        <!-- <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-core</artifactId>
             <version>2.9.8</version>
         </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
             <version>2.9.8</version>
         </dependency>-->
        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations -->
        <!-- <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.9.8</version>
        </dependency>-->
    
        <dependency>
          <groupId>com.alibaba</groupId>
          <artifactId>fastjson</artifactId>
          <version>1.1.41</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/it.unimi.dsi/fastutil -->
        <dependency>
          <groupId>it.unimi.dsi</groupId>
          <artifactId>fastutil</artifactId>
          <version>8.1.0</version>
        </dependency>
    package com.hzk.spark;
    
    import com.hzk.conf.ConfigurationManager;
    import com.hzk.constant.Constants;
    import com.hzk.dao.IAreaTop10Dao;
    import com.hzk.dao.factory.DaoFactory;
    import com.hzk.domain.AreaTop10;
    import com.hzk.utils.SparkUtils;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.sql.SparkSession;
    import scala.Tuple2;
    
    import java.util.*;
    
    public class AreaTop10Spark {
    
        public static void main(String[] args) {
            SparkConf conf = new SparkConf()
                    .setAppName(Constants.SPARK_Area_TOP10)
    //                .set("spark.storage.memoryFraction", "0.5")
    //                .set("spark.shuffle.consolidateFiles", "true")
    //                .set("spark.shuffle.file.buffer", "64")
    //                .set("spark.shuffle.memoryFraction", "0.3")
    //                .set("spark.reducer.maxSizeInFlight", "24")
    //                .set("spark.shuffle.io.maxRetries", "60")
    //                .set("spark.shuffle.io.retryWait", "60")
                    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
            SparkUtils.setMaster(conf);
            // 构建Spark上下文
            JavaSparkContext sc=new JavaSparkContext(conf);
            SQLContext sqlContext = SparkUtils.getSQLContext(sc.sc());
            JavaPairRDD<Long, String> corpid2corpAreaRDD=getCorpid2corpAreaRDD(sqlContext);
            JavaPairRDD<String,Integer> AreaTop10s=getAreaTop10s(corpid2corpAreaRDD);
            persistAreaTop10(AreaTop10s);
    
            sc.close();
        }
    
        /**
         * 使用Spark SQL从MySQL中查询单位信息<id,Area></>
         * @param sqlContext SQLContext
         * @return
         */
        private static JavaPairRDD<Long, String> getCorpid2corpAreaRDD(SQLContext sqlContext){
            // 构建MySQL连接配置信息(直接从配置文件中获取)
            String url = null;
            String user = null;
            String password = null;
            boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
            if(local) {
                url = ConfigurationManager.getProperty(Constants.JDBC_URL);
                user = ConfigurationManager.getProperty(Constants.JDBC_USER);
                password = ConfigurationManager.getProperty(Constants.JDBC_PASSWORD);
            } else {
                url = ConfigurationManager.getProperty(Constants.JDBC_URL_PROD);
                user = ConfigurationManager.getProperty(Constants.JDBC_USER_PROD);
                password = ConfigurationManager.getProperty(Constants.JDBC_PASSWORD_PROD);
            }
            Map<String, String> options = new HashMap<String, String>();
            options.put("url", url);
            options.put("dbtable", "t_corp");
            options.put("user", user);
            options.put("password", password);
            // 通过SQLContext去从MySQL中查询数据
            Dataset<Row> corpInfoDF=sqlContext.read().format("jdbc").options(options).load();
            sqlContext.clearCache();
            // 返回RDD
            JavaRDD<Row> corpInfoRDD=corpInfoDF.javaRDD();
            JavaPairRDD<Long,String> corpid2corpAreaRDD=corpInfoRDD.mapToPair(new PairFunction<Row, Long, String>() {
                @Override
                public Tuple2<Long, String> call(Row row) throws Exception {
                    Long id=Long.valueOf(String.valueOf(row.get(0)));
                    String area =String.valueOf(row.get(3));
                    return new Tuple2<>(id,area);
                }
            });
    //        corpid2corpAreaRDD.foreach(new VoidFunction<Tuple2<Long, String>>() {
    //            @Override
    //            public void call(Tuple2<Long, String> tuple2) throws Exception {
    //                System.out.println(tuple2._2);
    //            }
    //        });
            return  corpid2corpAreaRDD;
        }
    
        /**
         * 得到top10的地区名单
         *
         * @param corpid2corpAreaRDD
         * @return  List<String>
         */
        private static  JavaPairRDD<String,Integer> getAreaTop10s(JavaPairRDD<Long,String> corpid2corpAreaRDD){
            JavaRDD<String> AreaRDD=corpid2corpAreaRDD.map(new Function<Tuple2<Long, String>, String>() {
                @Override
                public String call(Tuple2<Long, String> tuple2) throws Exception {
                    return tuple2._2;
                }
            });
            JavaPairRDD<Integer,String> pairs=AreaRDD.mapToPair(new PairFunction<String, String,Integer>() {
                @Override
                public Tuple2<String, Integer> call(String s) throws Exception {
                    return new Tuple2<>(s.length()>3?s.substring(0,3):"-",1);
                }
            }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer integer, Integer integer2) throws Exception {
                    return integer+integer2;
                }
            }).mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
                @Override
                public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                    return new Tuple2<Integer, String>(stringIntegerTuple2._2,stringIntegerTuple2._1);
                }
            });
            JavaPairRDD<Integer,String> sortPairs=pairs.sortByKey(false);
            sortPairs.foreach(new VoidFunction<Tuple2<Integer, String>>() {
                @Override
                public void call(Tuple2<Integer, String> tuple2) throws Exception {
                    System.out.println(tuple2._2+"_"+tuple2._1);
                }
            });
            JavaPairRDD<String,Integer> Area2CountRDD=sortPairs.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple2) throws Exception {
                    return new Tuple2<String, Integer>(tuple2._2,tuple2._1);
                }
            });
    //        JavaRDD<String>  Area2CountRDD=sortPairs.map(new Function<Tuple2<Integer, String>, String>() {
    //            @Override
    //            public String call(Tuple2<Integer, String> tuple2) throws Exception {
    //                return  tuple2._2+"_"+tuple2._1;
    //            }
    //        });
    //        List<String> list=Area2CountRDD.top(10);
    //        Iterator<String> it2=list.iterator();
    //        while (it2.hasNext()){
    //            System.out.println(it2.next());
    //        }
            return Area2CountRDD;
    
        }
            /**
             * 将计算出来的Top10领域写入MySQL中
             * @param Area2CountRDD
             */
            private static void persistAreaTop10(JavaPairRDD<String,Integer> Area2CountRDD){
                List<AreaTop10> AreaTop10s=new LinkedList<AreaTop10>();
                List<String> Area2CountTop10List=Area2CountRDD.map(new Function<Tuple2<String, Integer>, String>() {
                    @Override
                    public String call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                        return stringIntegerTuple2._1+"_"+stringIntegerTuple2._2;
                    }
                }).take(10);
                for (int i=0;i<Area2CountTop10List.size();i++){
                    AreaTop10 AreaTop10=new AreaTop10();
                    String AreaName= Area2CountTop10List.get(i).split("_")[0];
                    int AreaCount=Integer.valueOf(Area2CountTop10List.get(i).split("_")[1]);
                    AreaTop10.setAreaId(i+1);
                    AreaTop10.setAreaName(AreaName.equals("-")?"区域未知":AreaName);
                    AreaTop10.setAreaCount(AreaCount);
                    AreaTop10s.add(AreaTop10);
                }
                IAreaTop10Dao AreaTop10Dao= DaoFactory.getAreaTop10Dao();
                AreaTop10Dao.deleteAll();
                AreaTop10Dao.insertBatch(AreaTop10s);
            }
    
    }
    package com.hzk.jdbc;
    
    import com.hzk.constant.Constants;
    import com.hzk.dao.ITCorpDao;
    import com.hzk.dao.factory.DaoFactory;
    import com.hzk.domain.TCorp;
    import org.apache.poi.hssf.usermodel.HSSFWorkbook;
    import org.apache.poi.ss.usermodel.CellType;
    import org.apache.poi.ss.usermodel.Row;
    import org.apache.poi.ss.usermodel.Sheet;
    import org.apache.poi.ss.usermodel.Workbook;
    import org.elasticsearch.action.bulk.BulkRequestBuilder;
    import org.elasticsearch.action.bulk.BulkResponse;
    import org.elasticsearch.action.index.IndexRequestBuilder;
    import org.elasticsearch.common.xcontent.XContentFactory;
    
    import java.io.FileInputStream;
    import java.sql.Blob;
    import java.util.ArrayList;
    import java.util.List;
    
    public class JDBCLoadData {
        static ITCorpDao itCorpDao = DaoFactory.getTCorpDao();
        public static void main(String[] args) throws Exception {
            itCorpDao.deleteAll();
            importData();
    
        }
    
        public static void importData() throws Exception {
            FileInputStream fis = null;
            Sheet sheet = null;
            //  List<Labmaster> saveList = new ArrayList<>();
            try {
                //获取一个绝对地址的流
                fis = new FileInputStream(Constants.EXCEL_FILE_ADRESS);
                Workbook workbook = new HSSFWorkbook(fis);
                // 获取第一个Sheet表
                sheet = workbook.getSheetAt(1);
                // getLastRowNum,获取最后一行的行标
                List<TCorp> list = new ArrayList<TCorp>();
                for (int j = 1; j < sheet.getLastRowNum() + 1; j++) {
                    Row row = sheet.getRow(j);
                    if (row != null) {
                        row.getCell(0).setCellType(CellType.STRING);
                        //赋值
                        String id = row.getCell(0).getStringCellValue();
                        String corpName = row.getCell(1)==null?"-":row.getCell(1).getStringCellValue().trim();
                        String operManName = row.getCell(2)==null?"-":row.getCell(2).getStringCellValue().trim();
                        String regCapi = row.getCell(3)==null?"-":row.getCell(3).getStringCellValue().trim();
                        String paidCapi = row.getCell(4)==null?"-":row.getCell(4)==null?"-":row.getCell(4).getStringCellValue().trim();
                        String corpStatus = row.getCell(5)==null?"-":row.getCell(5).getStringCellValue().trim();
                        String startDate = row.getCell(6)==null?"-":row.getCell(6).getStringCellValue().trim();
                        String uniScid = row.getCell(7)==null?"-":row.getCell(7).getStringCellValue().trim();
                        String taxpayNum = row.getCell(8)==null?"-":row.getCell(8).getStringCellValue().trim();
                        String regNo = row.getCell(9)==null?"-":row.getCell(9).getStringCellValue().trim();
                        String orgInstCode = row.getCell(10)==null?"-":row.getCell(10).getStringCellValue().trim();
                        String econKind = row.getCell(11)==null?"-":row.getCell(11).getStringCellValue().trim();
                        String belongTrade = row.getCell(12)==null?"-":row.getCell(12).getStringCellValue().trim();
                        String staffSize =row.getCell(13) ==null?"-":row.getCell(13).getStringCellValue().trim();
                        String fareScope =row.getCell(16) ==null?"-":row.getCell(16).getStringCellValue().trim();
                        String corpAddr =row.getCell(15) ==null?"-":row.getCell(15).getStringCellValue().trim();
                        String corpInfo = row.getCell(16)==null?"-":row.getCell(16).getStringCellValue().trim();
                        TCorp tCorp=new TCorp();
                        tCorp.setId(Integer.parseInt(id.trim()));
                        tCorp.setCorpName(corpName);
                        tCorp.setOperManName(operManName);
                        tCorp.setRegCapi(regCapi);
                        tCorp.setPaidCapi(paidCapi);
                        tCorp.setCorpStatus(corpStatus);
                        tCorp.setStartDate(startDate);
                        tCorp.setUniScid(uniScid);
                        tCorp.setTaxpayNum(taxpayNum);
                        tCorp.setRegNo(regNo);
                        tCorp.setOrgInstCode(orgInstCode);
                        tCorp.setEconKind(econKind);
                        tCorp.setBelongTrade(belongTrade);
                        tCorp.setStaffSize(staffSize);
                        tCorp.setFareScope(fareScope);
                        tCorp.setCorpAddr(corpAddr);
                        tCorp.setCorpInfo(corpInfo);
                        list.add(tCorp);
                    }
                }
                itCorpDao.insertAll(list);
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    package com.hzk.jdbc;
    
    import com.hzk.conf.ConfigurationManager;
    import com.hzk.constant.Constants;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.util.LinkedList;
    import java.util.List;
    
    
    /**
     * JDBC辅助组件
     *
     * 在正式的项目的代码编写过程中,是完全严格按照大公司的coding标准来的
     * 也就是说,在代码中,是不能出现任何hard code(硬编码)的字符
     * 比如“张三”、“com.mysql.jdbc.Driver”
     * 所有这些东西,都需要通过常量来封装和使用
     *
     * @author Administrator
     *
     */
    public class JDBCHelper {
    
        // 第一步:在静态代码块中,直接加载数据库的驱动
        // 加载驱动,不是直接简单的,使用com.mysql.jdbc.Driver就可以了
        // 之所以说,不要硬编码,他的原因就在于这里
        //
        // com.mysql.jdbc.Driver只代表了MySQL数据库的驱动
        // 那么,如果有一天,我们的项目底层的数据库要进行迁移,比如迁移到Oracle
        // 或者是DB2、SQLServer
        // 那么,就必须很费劲的在代码中,找,找到硬编码了com.mysql.jdbc.Driver的地方,然后改成
        // 其他数据库的驱动类的类名
        // 所以正规项目,是不允许硬编码的,那样维护成本很高
        //
        // 通常,我们都是用一个常量接口中的某个常量,来代表一个值
        // 然后在这个值改变的时候,只要改变常量接口中的常量对应的值就可以了
        //
        // 项目,要尽量做成可配置的
        // 就是说,我们的这个数据库驱动,更进一步,也不只是放在常量接口中就可以了
        // 最好的方式,是放在外部的配置文件中,跟代码彻底分离
        // 常量接口中,只是包含了这个值对应的key的名字
        static {
            try {
                String driver = ConfigurationManager.getProperty(Constants.JDBC_DRIVER);
                Class.forName(driver);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        // 第二步,实现JDBCHelper的单例化
        // 为什么要实现代理化呢?因为它的内部要封装一个简单的内部的数据库连接池
        // 为了保证数据库连接池有且仅有一份,所以就通过单例的方式
        // 保证JDBCHelper只有一个实例,实例中只有一份数据库连接池
        private static JDBCHelper instance = null;
    
        /**
         * 获取单例
         * @return 单例
         */
        public static JDBCHelper getInstance() {
            if(instance == null) {
                synchronized(JDBCHelper.class) {
                    if(instance == null) {
                        instance = new JDBCHelper();
                    }
                }
            }
            return instance;
        }
    
        // 数据库连接池
        private LinkedList<Connection> datasource = new LinkedList<Connection>();
    
        /**
         *
         * 第三步:实现单例的过程中,创建唯一的数据库连接池
         *
         * 私有化构造方法
         *
         * JDBCHelper在整个程序运行声明周期中,只会创建一次实例
         * 在这一次创建实例的过程中,就会调用JDBCHelper()构造方法
         * 此时,就可以在构造方法中,去创建自己唯一的一个数据库连接池
         *
         */
        private JDBCHelper() {
            // 首先第一步,获取数据库连接池的大小,就是说,数据库连接池中要放多少个数据库连接
            // 这个,可以通过在配置文件中配置的方式,来灵活的设定
            int datasourceSize = ConfigurationManager.getInterger(
                    Constants.JDBC_DATASOURCE_SIZE);
    
            // 然后创建指定数量的数据库连接,并放入数据库连接池中
            for(int i = 0; i < datasourceSize; i++) {
                boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
                String url = null;
                String user = null;
                String password = null;
    
                if(local) {
                    url = ConfigurationManager.getProperty(Constants.JDBC_URL);
                    user = ConfigurationManager.getProperty(Constants.JDBC_USER);
                    password = ConfigurationManager.getProperty(Constants.JDBC_PASSWORD);
                } else {
                    url = ConfigurationManager.getProperty(Constants.JDBC_URL_PROD);
                    user = ConfigurationManager.getProperty(Constants.JDBC_USER_PROD);
                    password = ConfigurationManager.getProperty(Constants.JDBC_PASSWORD_PROD);
                }
    
                try {
                    Connection conn = DriverManager.getConnection(url, user, password);
                    datasource.push(conn);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 第四步,提供获取数据库连接的方法
         * 有可能,你去获取的时候,这个时候,连接都被用光了,你暂时获取不到数据库连接
         * 所以我们要自己编码实现一个简单的等待机制,去等待获取到数据库连接
         *
         */
        public synchronized Connection getConnection() {
            while(datasource.size() == 0) {
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return datasource.poll();
        }
    
        /**
         * 第五步:开发增删改查的方法
         * 1、执行增删改SQL语句的方法
         * 2、执行查询SQL语句的方法
         * 3、批量执行SQL语句的方法
         */
    
        /**
         * 执行增删改SQL语句
         * @param sql
         * @param params
         * @return 影响的行数
         */
        public int executeUpdate(String sql, Object[] params) {
            int rtn = 0;
            Connection conn = null;
            PreparedStatement pstmt = null;
    
            try {
                conn = getConnection();
                conn.setAutoCommit(false);
    
                pstmt = conn.prepareStatement(sql);
    
                if(params != null && params.length > 0) {
                    for(int i = 0; i < params.length; i++) {
                        pstmt.setObject(i + 1, params[i]);
                    }
                }
    
                rtn = pstmt.executeUpdate();
    
                conn.commit();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if(conn != null) {
                    datasource.push(conn);
                }
            }
    
            return rtn;
        }
    
        /**
         * 执行查询SQL语句
         * @param sql
         * @param params
         * @param callback
         */
        public void executeQuery(String sql, Object[] params,
                                 QueryCallback callback) {
            Connection conn = null;
            PreparedStatement pstmt = null;
            ResultSet rs = null;
    
            try {
                conn = getConnection();
                pstmt = conn.prepareStatement(sql);
    
                if(params != null && params.length > 0) {
                    for(int i = 0; i < params.length; i++) {
                        pstmt.setObject(i + 1, params[i]);
                    }
                }
    
                rs = pstmt.executeQuery();
    
                callback.process(rs);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if(conn != null) {
                    datasource.push(conn);
                }
            }
        }
    
        /**
         * 批量执行SQL语句
         *
         * 批量执行SQL语句,是JDBC中的一个高级功能
         * 默认情况下,每次执行一条SQL语句,就会通过网络连接,向MySQL发送一次请求
         *
         * 但是,如果在短时间内要执行多条结构完全一模一样的SQL,只是参数不同
         * 虽然使用PreparedStatement这种方式,可以只编译一次SQL,提高性能,但是,还是对于每次SQL
         * 都要向MySQL发送一次网络请求
         *
         * 可以通过批量执行SQL语句的功能优化这个性能
         * 一次性通过PreparedStatement发送多条SQL语句,比如100条、1000条,甚至上万条
         * 执行的时候,也仅仅编译一次就可以
         * 这种批量执行SQL语句的方式,可以大大提升性能
         *
         * @param sql
         * @param paramsList
         * @return 每条SQL语句影响的行数
         */
        public int[] executeBatch(String sql, List<Object[]> paramsList) {
            int[] rtn = null;
            Connection conn = null;
            PreparedStatement pstmt = null;
    
            try {
                conn = getConnection();
    
                // 第一步:使用Connection对象,取消自动提交
                conn.setAutoCommit(false);
    
                pstmt = conn.prepareStatement(sql);
    
                // 第二步:使用PreparedStatement.addBatch()方法加入批量的SQL参数
                if(paramsList != null && paramsList.size() > 0) {
                    for(Object[] params : paramsList) {
                        for(int i = 0; i < params.length; i++) {
                            pstmt.setObject(i + 1, params[i]);
                        }
                        pstmt.addBatch();
                    }
                }
    
                // 第三步:使用PreparedStatement.executeBatch()方法,执行批量的SQL语句
                rtn = pstmt.executeBatch();
    
                // 最后一步:使用Connection对象,提交批量的SQL语句
                conn.commit();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if(conn != null) {
                    datasource.push(conn);
                }
            }
    
            return rtn;
        }
    
        /**
         * 静态内部类:查询回调接口
         * @author Administrator
         *
         */
        public static interface QueryCallback {
    
            /**
             * 处理查询结果
             * @param rs
             * @throws Exception
             */
            void process(ResultSet rs) throws Exception;
    
        }
    
    }
    package com.hzk.conf;
    
    import java.io.InputStream;
    import java.util.Properties;
    
    /**
     * 配置管理组件
     *
     * 1、配置管理组件可以复杂,也可以很简单,对于简单的配置管理组件来说,只要开发一个类,可以在第一次访问它的
     *         时候,就从对应的properties文件中,读取配置项,并提供外界获取某个配置key对应的value的方法
     * 2、如果是特别复杂的配置管理组件,那么可能需要使用一些软件设计中的设计模式,比如单例模式、解释器模式
     *         可能需要管理多个不同的properties,甚至是xml类型的配置文件
     * 3、我们这里的话,就是开发一个简单的配置管理组件,就可以了
     *
     * @author Administrator
     *
     */
    public class ConfigurationManager {
        // Properties对象使用private来修饰,就代表了其是类私有的
        // 那么外界的代码,就不能直接通过ConfigurationManager.prop这种方式获取到Properties对象
        // 之所以这么做,是为了避免外界的代码不小心错误的更新了Properties中某个key对应的value
        // 从而导致整个程序的状态错误,乃至崩溃
        private  static Properties prop=new Properties();
        /**
         * 静态代码块
         *
         * Java中,每一个类第一次使用的时候,就会被Java虚拟机(JVM)中的类加载器,去从磁盘上的.class文件中
         * 加载出来,然后为每个类都会构建一个Class对象,就代表了这个类
         *
         * 每个类在第一次加载的时候,都会进行自身的初始化,那么类初始化的时候,会执行哪些操作的呢?
         * 就由每个类内部的static {}构成的静态代码块决定,我们自己可以在类中开发静态代码块
         * 类第一次使用的时候,就会加载,加载的时候,就会初始化类,初始化类的时候就会执行类的静态代码块
         *
         * 因此,对于我们的配置管理组件,就在静态代码块中,编写读取配置文件的代码
         * 这样的话,第一次外界代码调用这个ConfigurationManager类的静态方法的时候,就会加载配置文件中的数据
         *
         * 而且,放在静态代码块中,还有一个好处,就是类的初始化在整个JVM生命周期内,有且仅有一次,也就是说
         * 配置文件只会加载一次,然后以后就是重复使用,效率比较高;不用反复加载多次
         */
        static {
            try {
                // 通过一个“类名.class”的方式,就可以获取到这个类在JVM中对应的Class对象
                // 然后再通过这个Class对象的getClassLoader()方法,就可以获取到当初加载这个类的JVM
                // 中的类加载器(ClassLoader),然后调用ClassLoader的getResourceAsStream()这个方法
                // 就可以用类加载器,去加载类加载路径中的指定的文件
                // 最终可以获取到一个,针对指定文件的输入流(InputStream)
                InputStream in= ConfigurationManager.class.getClassLoader().getResourceAsStream("my.properties");
                // 调用Properties的load()方法,给它传入一个文件的InputStream输入流
                // 即可将文件中的符合“key=value”格式的配置项,都加载到Properties对象中
                // 加载过后,此时,Properties对象中就有了配置文件中所有的key-value对了
                // 然后外界其实就可以通过Properties对象获取指定key对应的value
                prop.load(in);
            }catch (Exception e){
                e.printStackTrace();
            }
        }
        /**
         * 获取指定key对应的value
         *
         * 第一次外界代码,调用ConfigurationManager类的getProperty静态方法时,JVM内部会发现
         * ConfigurationManager类还不在JVM的内存中
         *
         * 此时JVM,就会使用自己的ClassLoader(类加载器),去对应的类所在的磁盘文件(.class文件)中
         * 去加载ConfigurationManager类,到JVM内存中来,并根据类内部的信息,去创建一个Class对象
         * Class对象中,就包含了类的元信息,包括类有哪些field(Properties prop);有哪些方法(getProperty)
         *
         * 加载ConfigurationManager类的时候,还会初始化这个类,那么此时就执行类的static静态代码块
         * 此时咱们自己编写的静态代码块中的代码,就会加载my.properites文件的内容,到Properties对象中来
         *
         * 下一次外界代码,再调用ConfigurationManager的getProperty()方法时,就不会再次加载类,不会再次初始化
         * 类,和执行静态代码块了,所以也印证了,我们上面所说的,类只会加载一次,配置文件也仅仅会加载一次
         *
         * @param key
         * @return value
         */
        public static String getProperty(String key){
            return prop.getProperty(key);
        }
    
        /**
         * 获取整数类型的配置项
         * @param key
         * @return value
         */
        public  static Integer getInterger(String key){
            String value=prop.getProperty(key);
            try {
                return Integer.valueOf(value);
            }catch (Exception e){
                e.printStackTrace();
            }
            return  0;
        }
    
        /**
         * 获取布尔类型的配置项
         * @param key
         * @return value
         */
        public static Boolean getBoolean(String key) {
            String value = getProperty(key);
            try {
                return Boolean.valueOf(value);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return false;
        }
    
        /**
         * 获取Long类型的配置项
         * @param key
         * @return
         */
        public static Long getLong(String key) {
            String value = getProperty(key);
            try {
                return Long.valueOf(value);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return 0L;
        }
    
    }
    package com.hzk.utils;
    
    import com.alibaba.fastjson.JSONObject;
    import com.hzk.conf.ConfigurationManager;
    import com.hzk.constant.Constants;
    import org.apache.spark.SparkConf;
    import org.apache.spark.SparkContext;
    import org.apache.spark.sql.SQLContext;
    
    
    /**
     * Spark工具类
     * @author Administrator
     *
     */
    public class SparkUtils {
        
        /**
         * 根据当前是否本地测试的配置
         * 决定,如何设置SparkConf的master
         */
        public static void setMaster(SparkConf conf) {
            boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
            if(local) {
                conf.setMaster("local");
            }  
        }
        /**
         * 获取SQLContext
         * 如果spark.local设置为true,那么就创建SQLContext;否则,创建HiveContext
         * @param sc
         * @return
         */
        public static SQLContext getSQLContext(SparkContext sc) {
            boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
            if(local) {
                return new SQLContext(sc);
            } else {
                return  null;
            }
        }
        
    }
  • 相关阅读:
    vue中mixins(混入)的使用
    js实现淘宝轮播图放大镜效果
    vue中的provide和inject
    vue自定义过滤器
    vue自定义指令
    HTTP和HTTPS详解
    可靠的TCP连接为何是三次握手和四次挥手
    跟着动画来学习TCP三次握手和四次挥手
    简单了解TCP/IP与HTTP
    网络协议
  • 原文地址:https://www.cnblogs.com/Transkai/p/13394618.html
Copyright © 2011-2022 走看看