zoukankan      html  css  js  c++  java
  • Spark 数据库 连接池 java DAO 工厂


    在一般的java 项目  以及 现在特别火的大数据分析项目中 ,用到数据库以及数据库资源池 连接的事情 是在稀松平常不过的了 。今天就简单的梳理下 这是一个怎样的过程:

    我们按照代码的调度顺序审视下 :

    Comment ,我们是从Spark 数据分析做demo 展开的  :

    第一,假设读写数据库一定是从业务层面发出的 ,那么就应该有以下代码

    这是我们众多代码中的最后一步 ,写数据到数据库的代码,将最后生成的数据写入数据库 ,假设现在数据库类型不能,就要求我们提供可配置的功能了 ,the most impotent code I marked in red and bold

    
    JavaPairRDD<String, Tuple2<String, Row>> sessionDetailRDD =
          top10SessionRDD.join(sessionid2detailRDD);
    sessionDetailRDD.foreach(new VoidFunction<Tuple2<String,Tuple2<String,Row>>>() {
    
       private static final long serialVersionUID = 1L;
    
       @Override
       public void call(Tuple2<String, Tuple2<String, Row>> tuple) throws Exception {
          Row row = tuple._2._2;
    
          SessionDetail sessionDetail = new SessionDetail();
          sessionDetail.setTaskid(taskid);
          sessionDetail.setUserid(row.getLong(1));
          。。。。。。。、//reomve some contents 
          ISessionDetailDAO sessionDetailDAO = DAOFactory.getSessionDetailDAO(); //
          sessionDetailDAO.insert(sessionDetail);
       }
    });

    public static ISessionDetailDAO getSessionDetailDAO() {
       return new SessionDetailDAOImpl();
    }
    第二 ,上面代码来自DAOFactory 中定主意Impl 的实现

    接下来让看下 Impl 代码是如何实现的

    第三  insert 中的数据是我们insert 到 DB 中的数据 ,在这里要转化成参数 跟sql 拼接起来

    public class SessionDetailDAOImpl implements ISessionDetailDAO {
    
       /**
        * 插入一条数据
        * @param sessionDetail 
        */
       public void insert(SessionDetail sessionDetail) {
          String sql = "insert into table_name values(?,?,?,?,?,?,?,?,?,?,?,?)";  
          
          Object[] params = new Object[]{sessionDetail.getTaskid(),
                sessionDetail.getUserid(),
                sessionDetail.getSessionid(),
                
          
          JDBCHelper jdbcHelper = JDBCHelper.getInstance();
          jdbcHelper.executeUpdate(sql, params);
       }

    第四,介绍下JDBChelper 是怎么实现的 ,这一步需要到数据库资源池中获取db 链接


    
    
    import com.zkys.spark.conf.ConfigurationManager;
    import com.zkys.spark.constant.Constants;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.util.LinkedList;
    import java.util.List;
    
    public class JDBCHelper {
       
    
       static {
          try {
             String driver = ConfigurationManager.getProperty(Constants.JDBC_DRIVER);
             Class.forName(driver);
          } catch (Exception e) {
             e.printStackTrace();  
          }
       }
       
    
    
       private static JDBCHelper instance = null;
    
       public static JDBCHelper getInstance() {
          if(instance == null) {
             synchronized(JDBCHelper.class) {  //synchronized http://www.cnblogs.com/GnagWang/archive/2011/02/27/1966606.html
                if(instance == null) {
                   instance = new JDBCHelper();//调用私有无参构造方法 ,需要在构造方法中实现数据库的链接
                }
             }
          }
          return instance;
       }
       
       // 数据库连接池
       private LinkedList<Connection> datasource = new LinkedList<Connection>();
       //对于新增和删除操作addremoveLinedList比较占优势,因为ArrayList要移动数据。
    
       private JDBCHelper() {
    
          int datasourceSize = ConfigurationManager.getInteger(
                Constants.JDBC_DATASOURCE_SIZE);
          for(int i = 0; i < datasourceSize; i++) {
             boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
             String url = null;
             String user = null;
             String password = null;
                url = ConfigurationManager.getProperty(Constants.JDBC_URL);
                user = ConfigurationManager.getProperty(Constants.JDBC_USER);
                password = ConfigurationManager.getProperty(Constants.JDBC_PASSWORD);
             try {
                //经过循环在这里面创建了10 个数据库链接 ,并把连接放到LinkList 里面了
                Connection conn = DriverManager.getConnection(url, user, password);
                datasource.push(conn);  
             } catch (Exception e) {
                e.printStackTrace(); 
             }
          }
       }
    
       public synchronized Connection getConnection() {
          while(datasource.size() == 0) {
             try {
                Thread.sleep(10);
             } catch (InterruptedException e) {
                e.printStackTrace();
             }  
          }
          return datasource.poll();
       }
    
       public int executeUpdate(String sql, Object[] params) {
          int rtn = 0;
          Connection conn = null;
          PreparedStatement pstmt = null;
          
          try {
             conn = getConnection();
             conn.setAutoCommit(false);  
             
             pstmt = conn.prepareStatement(sql);
             
             if(params != null && params.length > 0) {
                for(int i = 0; i < params.length; i++) {
                   pstmt.setObject(i + 1, params[i]);  
                }
             }
             
             rtn = pstmt.executeUpdate();
             
             conn.commit();
          } catch (Exception e) {
             e.printStackTrace();  
          } finally {
             if(conn != null) {
                datasource.push(conn);  
             }
          }
          
          return rtn;
       }
    
       public void executeQuery(String sql, Object[] params,  QueryCallback callback) {
          Connection conn = null;
          PreparedStatement pstmt = null;
          ResultSet rs = null;
          
          try {
             conn = getConnection();
             pstmt = conn.prepareStatement(sql);
             
             if(params != null && params.length > 0) {
                for(int i = 0; i < params.length; i++) {
                   pstmt.setObject(i + 1, params[i]);   
                }
             }
             
             rs = pstmt.executeQuery();
             
             callback.process(rs);  
          } catch (Exception e) {
             e.printStackTrace();
          } finally {
             if(conn != null) {
                datasource.push(conn);  
             }
          }
       }
       
       public int[] executeBatch(String sql, List<Object[]> paramsList) {
          int[] rtn = null;
          Connection conn = null;
          PreparedStatement pstmt = null;
          
          try {
             conn = getConnection();
             
             // 第一步:使用Connection对象,取消自动提交
             conn.setAutoCommit(false);  
             
             pstmt = conn.prepareStatement(sql);
             
             // 第二步:使用PreparedStatement.addBatch()方法加入批量的SQL参数
             if(paramsList != null && paramsList.size() > 0) {
                for(Object[] params : paramsList) {
                   for(int i = 0; i < params.length; i++) {
                      pstmt.setObject(i + 1, params[i]);  
                   }
                   pstmt.addBatch();
                }
             }
             
             // 第三步:使用PreparedStatement.executeBatch()方法,执行批量的SQL语句
             rtn = pstmt.executeBatch();
             
             // 最后一步:使用Connection对象,提交批量的SQL语句
             conn.commit();
          } catch (Exception e) {
             e.printStackTrace();  
          } finally {
             if(conn != null) {
                datasource.push(conn);  
             }
          }
          
          return rtn;
       }
       
       /**
        * 静态内部类:查询回调接口
        * @author Administrator
        *
        */
       public static interface QueryCallback {
          
          /**
           * 处理查询结果
           * @param rs 
           * @throws Exception
           */
          void process(ResultSet rs) throws Exception;
          
       }
       
    }
    

    第五,由于是可以配置的,所以需要加入更多元素

    
    import java.io.InputStream;
    import java.util.Properties;
    
    public class ConfigurationManager {
       
    
       private static Properties prop = new Properties();
       static {
          try {
            
             InputStream in = ConfigurationManager.class
                   .getClassLoader().getResourceAsStream("my.properties");
             prop.load(in);  
          } catch (Exception e) {
             e.printStackTrace();  
          }
       }
       
       public static String getProperty(String key) {
          return prop.getProperty(key);
       }
      
       public static Integer getInteger(String key) {
          String value = getProperty(key);
          try {
             return Integer.valueOf(value);
          } catch (Exception e) {
             e.printStackTrace();
          }
          return 0;
       }
       
       public static Boolean getBoolean(String key) {
          String value = getProperty(key);
          try {
             return Boolean.valueOf(value);
          } catch (Exception e) {
             e.printStackTrace();
          }
          return false;
       }
    
       public static Long getLong(String key) {
          String value = getProperty(key);
          try {
             return Long.valueOf(value);
          } catch (Exception e) {
             e.printStackTrace();
          }
          return 0L;
       }
       
    }
    

















  • 相关阅读:
    shellshock溢出攻击
    内核编译与系统调用
    模块与系统调用
    20199315《Linux内核原理与分析》第十二周作业
    20199315《Linux内核原理与分析》第十一周作业
    Linux下的静态链接库和动态链接库
    2019-2020-1 20199315《Linux内核原理与分析》第九周作业
    2019-2020-1 20199315 《Linux内核原理与分析》 第八周作业
    2019-2020-1 20199315《Linux内核原理与分析》第七周作业
    2019-2020-1 20199315《Linux内核原理与分析》第六周作业
  • 原文地址:https://www.cnblogs.com/TendToBigData/p/10501318.html
Copyright © 2011-2022 走看看