zoukankan      html  css  js  c++  java
  • solr dataimport 数据导入源码分析(十二)

    去年本人写过一系列 solr dataimport 数据导入源码分析,由于博客园系统存在一些问题,后来上传的相同名称的图片覆盖了原来的图片,导致原来文章里面的图片与其内容不符合,我这里重新整理一下,也同时更新一些心得

    solr dataimport 数据导入的全局UML类图如下(分两张,我的显示器不够大,显示器显示不完)

    从上面的UML类图看以看出,这里面是采取了装饰模式以及迭代子模式等

    我们首先来分析DataSource类,该类为抽象泛型类,定义了初始化数据源和获取泛型数据的方法

    /**
     * <p>
     * Provides data from a source with a given query.
     * </p>
     * <p/>
     * <p>
     * Implementation of this abstract class must provide a default no-arg constructor
     * </p>
     * <p/>
     * <p>
     * Refer to <a
     * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
     * for more details.
     * </p>
     * <p/>
     * <b>This API is experimental and may change in the future.</b>
     *
     * @version $Id: DataSource.java 684025 2008-08-08 17:50:11Z shalin $
     * @since solr 1.3
     */
    public abstract class DataSource<T> {
    
      /**
       * Initializes the DataSource with the <code>Context</code> and
       * initialization properties.
       * <p/>
       * This is invoked by the <code>DataImporter</code> after creating an
       * instance of this class.
       *
       * @param context
       * @param initProps
       */
      public abstract void init(Context context, Properties initProps);
    
      /**
       * Get records for the given query.The return type depends on the
       * implementation .
       *
       * @param query The query string. It can be a SQL for JdbcDataSource or a URL
       *              for HttpDataSource or a file location for FileDataSource or a custom
       *              format for your own custom DataSource.
       * @return Depends on the implementation. For instance JdbcDataSource returns
       *         an Iterator<Map <String,Object>>
       */
      public abstract T getData(String query);
    
      /**
       * Cleans up resources of this DataSource after use.
       */
      public abstract void close();
    }

    该抽象类的注释很清楚,有不同的数据源继承类,包括针对数据库的JdbcDataSource数据源,针对URL的HttpDataSource数据源,针对本地文件的FileDataSource数据源,甚至还可以自定义数据源

    我这里只分析数据库的JdbcDataSource数据源,其他数据源类似,比较容易理解了

    针对数据库的JdbcDataSource数据源是获取Iterator<Map<String, Object>>数据迭代器

    //数据库连接工厂
    protected Callable<Connection> factory;
    
      private long connLastUsed = 0;
      //当前数据库连接
      private Connection conn;
      //字段名与类型的映射
      private Map<String, Integer> fieldNameVsType = new HashMap<String, Integer>();
      //是否转换类型
      private boolean convertType = false;
    
      private int batchSize = FETCH_SIZE;
    
      private int maxRows = 0;

    在初始化方法里面初始化数据库连接工厂等

    @Override
      public void init(Context context, Properties initProps) {
        Object o = initProps.get(CONVERT_TYPE);
        if (o != null)
          convertType = Boolean.parseBoolean(o.toString());
    //数据库连接工厂
        factory = createConnectionFactory(context, initProps);
    //批次大小
        String bsz = initProps.getProperty("batchSize");
        if (bsz != null) {
          bsz = context.replaceTokens(bsz);
          try {
            batchSize = Integer.parseInt(bsz);
            if (batchSize == -1)
              batchSize = Integer.MIN_VALUE;
          } catch (NumberFormatException e) {
            LOG.warn("Invalid batch size: " + bsz);
          }
        }
    //初始化字段名与类型的映射
        for (Map<String, String> map : context.getAllEntityFields()) {
          String n = map.get(DataImporter.COLUMN);
          String t = map.get(DataImporter.TYPE);
          if ("sint".equals(t) || "integer".equals(t))
            fieldNameVsType.put(n, Types.INTEGER);
          else if ("slong".equals(t) || "long".equals(t))
            fieldNameVsType.put(n, Types.BIGINT);
          else if ("float".equals(t) || "sfloat".equals(t))
            fieldNameVsType.put(n, Types.FLOAT);
          else if ("double".equals(t) || "sdouble".equals(t))
            fieldNameVsType.put(n, Types.DOUBLE);
          else if ("date".equals(t))
            fieldNameVsType.put(n, Types.DATE);
          else if ("boolean".equals(t))
            fieldNameVsType.put(n, Types.BOOLEAN);
          else if ("binary".equals(t))
            fieldNameVsType.put(n, Types.BLOB);
          else
            fieldNameVsType.put(n, Types.VARCHAR);
        }
      }

    数据库连接工厂创建方法如下(本人添加了部分注释)

    protected Callable<Connection> createConnectionFactory(final Context context,
                                           final Properties initProps) {
    //    final VariableResolver resolver = context.getVariableResolver();
    //根据模板替换字符
        resolveVariables(context, initProps);
        final String jndiName = initProps.getProperty(JNDI_NAME);
        final String url = initProps.getProperty(URL);
        final String driver = initProps.getProperty(DRIVER);
    
        if (url == null && jndiName == null)
          throw new DataImportHandlerException(SEVERE,
                  "JDBC URL or JNDI name has to be specified");
    
        if (driver != null) {
          try {
            //注册驱动
            DocBuilder.loadClass(driver, context.getSolrCore());
          } catch (ClassNotFoundException e) {
            wrapAndThrow(SEVERE, e, "Could not load driver: " + driver);
          }
        } else {
          if(jndiName == null){
            throw new DataImportHandlerException(SEVERE, "One of driver or jndiName must be specified in the data source");
          }
        }
    
        String s = initProps.getProperty("maxRows");
        if (s != null) {
          maxRows = Integer.parseInt(s);
        }
    
        return factory = new Callable<Connection>() {
          public Connection call() throws Exception {
            LOG.info("Creating a connection for entity "
                    + context.getEntityAttribute(DataImporter.NAME) + " with URL: "
                    + url);
            long start = System.currentTimeMillis();
            Connection c = null;
            try {
              if(url != null){//URL方式
                c = DriverManager.getConnection(url, initProps);
              } else if(jndiName != null){//JNDI方式
                InitialContext ctx =  new InitialContext();
                Object jndival =  ctx.lookup(jndiName);
                if (jndival instanceof javax.sql.DataSource) {
                  javax.sql.DataSource dataSource = (javax.sql.DataSource) jndival;
                  String user = (String) initProps.get("user");
                  String pass = (String) initProps.get("password");
                  if(user == null || user.trim().equals("")){
                    c = dataSource.getConnection();
                  } else {
                    c = dataSource.getConnection(user, pass);
                  }
                } else {
                  throw new DataImportHandlerException(SEVERE,
                          "the jndi name : '"+jndiName +"' is not a valid javax.sql.DataSource");
                }
              }
            } catch (SQLException e) {
              // DriverManager does not allow you to use a driver which is not loaded through
              // the class loader of the class which is trying to make the connection.
              // This is a workaround for cases where the user puts the driver jar in the
              // solr.home/lib or solr.home/core/lib directories.
              Driver d = (Driver) DocBuilder.loadClass(driver, context.getSolrCore()).newInstance();
              c = d.connect(url, initProps);
            }
            if (c != null) {
              if (Boolean.parseBoolean(initProps.getProperty("readOnly"))) {
                c.setReadOnly(true);
                // Add other sane defaults
                c.setAutoCommit(true);
                c.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
                c.setHoldability(ResultSet.CLOSE_CURSORS_AT_COMMIT);
              }
              if (!Boolean.parseBoolean(initProps.getProperty("autoCommit"))) {
                c.setAutoCommit(false);
              }
              String transactionIsolation = initProps.getProperty("transactionIsolation");
              if ("TRANSACTION_READ_UNCOMMITTED".equals(transactionIsolation)) {
                c.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
              } else if ("TRANSACTION_READ_COMMITTED".equals(transactionIsolation)) {
                c.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
              } else if ("TRANSACTION_REPEATABLE_READ".equals(transactionIsolation)) {
                c.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
              } else if ("TRANSACTION_SERIALIZABLE".equals(transactionIsolation)) {
                c.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
              } else if ("TRANSACTION_NONE".equals(transactionIsolation)) {
                c.setTransactionIsolation(Connection.TRANSACTION_NONE);
              }
              String holdability = initProps.getProperty("holdability");
              if ("CLOSE_CURSORS_AT_COMMIT".equals(holdability)) {
                c.setHoldability(ResultSet.CLOSE_CURSORS_AT_COMMIT);
              } else if ("HOLD_CURSORS_OVER_COMMIT".equals(holdability)) {
                c.setHoldability(ResultSet.HOLD_CURSORS_OVER_COMMIT);
              }
            }
            LOG.info("Time taken for getConnection(): "
                    + (System.currentTimeMillis() - start));
            return c;
          }
        };
      }

     下面方法为获取Iterator<Map<String, Object>>数据迭代器

     @Override
      public Iterator<Map<String, Object>> getData(String query) {
        ResultSetIterator r = new ResultSetIterator(query);
        return r.getIterator();
      }

    ResultSetIterator为内部类,构造方法传入数据查询参数(数据库为SQL语句)

    在ResultSetIterator内部类,根据数据库连接,完成对查询语句的执行,然后封装RecordSet数据集,返回Iterator<Map<String, Object>>数据迭代器

    private class ResultSetIterator {
        ResultSet resultSet;
    
        Statement stmt = null;
    
        List<String> colNames;
    
        Iterator<Map<String, Object>> rSetIterator;
    
        public ResultSetIterator(String query) {
    
          try {
            Connection c = getConnection();
            stmt = c.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            stmt.setFetchSize(batchSize);
            stmt.setMaxRows(maxRows);
            LOG.debug("Executing SQL: " + query);
            long start = System.currentTimeMillis();
            if (stmt.execute(query)) {
              resultSet = stmt.getResultSet();
            }
            LOG.trace("Time taken for sql :"
                    + (System.currentTimeMillis() - start));
            colNames = readFieldNames(resultSet.getMetaData());
          } catch (Exception e) {
            wrapAndThrow(SEVERE, e, "Unable to execute query: " + query);
          }
          if (resultSet == null) {
            rSetIterator = new ArrayList<Map<String, Object>>().iterator();
            return;
          }
    
          rSetIterator = new Iterator<Map<String, Object>>() {
            public boolean hasNext() {
              return hasnext();
            }
    
            public Map<String, Object> next() {
              return getARow();
            }
    
            public void remove() {/* do nothing */
            }
          };
        }
    
        private Iterator<Map<String, Object>> getIterator() {
          return rSetIterator;
        }
    
        private Map<String, Object> getARow() {
          if (resultSet == null)
            return null;
          Map<String, Object> result = new HashMap<String, Object>();
          for (String colName : colNames) {
            try {
              if (!convertType) {
                // Use underlying database's type information
                result.put(colName, resultSet.getObject(colName));
                continue;
              }
    
              Integer type = fieldNameVsType.get(colName);
              if (type == null)
                type = Types.VARCHAR;
              switch (type) {
                case Types.INTEGER:
                  result.put(colName, resultSet.getInt(colName));
                  break;
                case Types.FLOAT:
                  result.put(colName, resultSet.getFloat(colName));
                  break;
                case Types.BIGINT:
                  result.put(colName, resultSet.getLong(colName));
                  break;
                case Types.DOUBLE:
                  result.put(colName, resultSet.getDouble(colName));
                  break;
                case Types.DATE:
                  result.put(colName, resultSet.getDate(colName));
                  break;
                case Types.BOOLEAN:
                  result.put(colName, resultSet.getBoolean(colName));
                  break;
                case Types.BLOB:
                  result.put(colName, resultSet.getBytes(colName));
                  break;
                default:
                  result.put(colName, resultSet.getString(colName));
                  break;
              }
            } catch (SQLException e) {
              logError("Error reading data ", e);
              wrapAndThrow(SEVERE, e, "Error reading data from database");
            }
          }
          return result;
        }
    
        private boolean hasnext() {
          if (resultSet == null)
            return false;
          try {
            if (resultSet.next()) {
              return true;
            } else {
              close();
              return false;
            }
          } catch (SQLException e) {
            close();
            wrapAndThrow(SEVERE,e);
            return false;
          }
        }
    
        private void close() {
          try {
            if (resultSet != null)
              resultSet.close();
            if (stmt != null)
              stmt.close();
          } catch (Exception e) {
            logError("Exception while closing result set", e);
          } finally {
            resultSet = null;
            stmt = null;
          }
        }
      }

    ResultSetIterator内部类是Iterator设计模式的体现

    下列方法不多解释,是获取数据连接和关闭数据连接等

    private Connection getConnection() throws Exception {
        long currTime = System.currentTimeMillis();
        if (currTime - connLastUsed > CONN_TIME_OUT) {
          synchronized (this) {
            Connection tmpConn = factory.call();
            closeConnection();
            connLastUsed = System.currentTimeMillis();
            return conn = tmpConn;
          }
    
        } else {
          connLastUsed = currTime;
          return conn;
        }
      }
    
      @Override
      protected void finalize() throws Throwable {
        try {
          if(!isClosed){
            LOG.error("JdbcDataSource was not closed prior to finalize(), indicates a bug -- POSSIBLE RESOURCE LEAK!!!");
            close();
          }
        } finally {
          super.finalize();
        }
      }
    
      private boolean isClosed = false;
    
      @Override
      public void close() {
        try {
          closeConnection();
        } finally {
          isClosed = true;
        }
      }
    
      private void closeConnection()  {
        try {
          if (conn != null) {
            conn.close();
          }
        } catch (Exception e) {
          LOG.error("Ignoring Error when closing connection", e);
        }
      }

    最后,在DataImporter类中完成JdbcDataSource数据源的实例化

    DataSource getDataSourceInstance(DataConfig.Entity key, String name, Context ctx) {
        Properties p = dataSourceProps.get(name);
        if (p == null)
          p = config.dataSources.get(name);
        if (p == null)
          p = dataSourceProps.get(null);// for default data source
        if (p == null)
          p = config.dataSources.get(null);
        if (p == null)  
          throw new DataImportHandlerException(SEVERE,
                  "No dataSource :" + name + " available for entity :"
                          + key.name);
        String type = p.getProperty(TYPE);
        DataSource dataSrc = null;
        if (type == null) {
          dataSrc = new JdbcDataSource();
        } else {
          try {
            dataSrc = (DataSource) DocBuilder.loadClass(type, getCore()).newInstance();
          } catch (Exception e) {
            wrapAndThrow(SEVERE, e, "Invalid type for data source: " + type);
          }
        }
        try {
          Properties copyProps = new Properties();
          copyProps.putAll(p);
          Map<String, Object> map = ctx.getRequestParameters();
          if (map.containsKey("rows")) {
            int rows = Integer.parseInt((String) map.get("rows"));
            if (map.containsKey("start")) {
              rows += Integer.parseInt((String) map.get("start"));
            }
            copyProps.setProperty("maxRows", String.valueOf(rows));
          }
          //初始化
          dataSrc.init(ctx, copyProps);
        } catch (Exception e) {
          wrapAndThrow(SEVERE, e, "Failed to initialize DataSource: " + key.dataSource);
        }
        return dataSrc;
      }

    ---------------------------------------------------------------------------

    本系列solr dataimport 数据导入源码分析系本人原创

    转载请注明出处 博客园 刺猬的温驯

    本文链接 http://www.cnblogs.com/chenying99/archive/2013/05/04/3059295.html

  • 相关阅读:
    android 中ImageButton按下改变背景图片的效果
    Android根据Button状态(normal,focused,pressed)显示不同背景图片
    Android简单逐帧动画Frame的实现(三)
    Android消息总线的演进之路:用LiveDataBus替代RxBus、EventBus
    美团点评云真机平台实践
    美团客户端响应式框架EasyReact开源啦
    MCI:移动持续集成在大众点评的实践
    如何基于深度学习实现图像的智能审核?
    Android自动化页面测速在美团的实践
    美团外卖iOS多端复用的推动、支撑与思考
  • 原文地址:https://www.cnblogs.com/chenying99/p/3059295.html
Copyright © 2011-2022 走看看