zoukankan      html  css  js  c++  java
  • solr中通过SFTP访问文件建立索引

    需求:

      从oracle数据库中根据记录的文件名filename_html(多个文件以逗号隔开),文件路径path,备用文件名bakpath中获取

    主机172.21.0.31上对应的html文件内容,并且只能通过sftp访问html文件,获取文件内容建立索引.

    问题:

      目前的难点是字段filename_html中可以有多个文件名,并且多个文件抽取到一个索引字段content下面.另一个是数据访问方式sftp方式.目前DIH组件中没有相应的SFTP访问.

    解决方法:

      引入jsch组件包.开发相应SFTP组件.

    1.编写BinSFTPDataSource数据源,用于生成响应的InputStream流.编写过程中注意流的关闭,否则容易造成Too many files 异常.

    package org.apache.solr.handler.dataimport;
    
    import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
    import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
    
    import org.apache.commons.lang.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.jcraft.jsch.ChannelSftp;
    import com.jcraft.jsch.JSch;
    import com.jcraft.jsch.JSchException;
    import com.jcraft.jsch.Session;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.util.Properties;
    import java.util.regex.Pattern;
    
    /**
     */
    public class BinSFTPDataSource extends DataSource<InputStream> {
        Logger LOG = LoggerFactory.getLogger(BinSFTPDataSource.class);
    
        private Session session ;
        private ChannelSftp channel;
        private InputStream is;
        
        private String baseUrl;
    
        private String username;
    
        private String password;
    
        private String host;
    
        private int connectionTimeout = CONNECTION_TIMEOUT;
    
        private int readTimeout = READ_TIMEOUT;
    
        private Context context;
    
        private Properties initProps;
    
        public BinSFTPDataSource() {
        }
    
        @Override
        public void init(Context context, Properties initProps) {
            this.context = context;
            this.initProps = initProps;
    
            baseUrl = getInitPropWithReplacements(BASE_URL);
            String cTimeout = getInitPropWithReplacements(CONNECTION_TIMEOUT_FIELD_NAME);
            String rTimeout = getInitPropWithReplacements(READ_TIMEOUT_FIELD_NAME);
            username = getInitPropWithReplacements(USERNAME);
            password = getInitPropWithReplacements(PASSWORD);
            host = getInitPropWithReplacements(HOST);
            if (cTimeout != null) {
                try {
                    connectionTimeout = Integer.parseInt(cTimeout);
                } catch (NumberFormatException e) {
                    LOG.warn("Invalid connection timeout: " + cTimeout);
                }
            }
            if (rTimeout != null) {
                try {
                    readTimeout = Integer.parseInt(rTimeout);
                } catch (NumberFormatException e) {
                    LOG.warn("Invalid read timeout: " + rTimeout);
                }
            }
            try {
                JSch jsch = new JSch(); // 创建JSch对象
                session = jsch.getSession(username, host, PORT);
                if (password != null)
                    session.setPassword(password);
                Properties config = new Properties();
                config.put("StrictHostKeyChecking", "no");
                session.setConfig(config);
    
                session.setTimeout(readTimeout);
                session.connect(connectionTimeout);
            } catch (JSchException e) {
                close();
                e.printStackTrace();
            }
        }
    
        @Override
        public InputStream getData(String filename) {
            if(StringUtils.isEmpty(filename)) return null;
            if(StringUtils.isNotEmpty(baseUrl))
                filename = baseUrl + filename;
            try {
                LOG.info("session isConnect:"+session.isConnected());
                channel = (ChannelSftp) session.openChannel("sftp");
                channel.connect(); // 建立SFTP通道的连接
                LOG.info("channel isConnect:"+channel.isConnected());
                is = channel.get(filename);
                return  is;
            } catch (Exception e) {
                close();
                LOG.error("Exception thrown while getting data", e);
                wrapAndThrow(SEVERE, e, "Exception in invoking url " +filename);
                return null;// unreachable
            }
        }
    
        @Override
        public void close() {
            if(is!=null)
                try {
                    is.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            if(channel!=null) channel.disconnect();
    //        if(session!=null) session.disconnect();
        }
    
        public String getBaseUrl() {
            return baseUrl;
        }
    
        private String getInitPropWithReplacements(String propertyName) {
            final String expr = initProps.getProperty(propertyName);
            if (expr == null) {
                return null;
            }
            return context.replaceTokens(expr);
        }
    
        static final Pattern URIMETHOD = Pattern.compile("sftp:/",
                Pattern.CASE_INSENSITIVE);
    
        public static final String ENCODING = "encoding";
    
        public static final String BASE_URL = "baseUrl";
    
        public static final String UTF_8 = "UTF-8";
    
        public static final String CONNECTION_TIMEOUT_FIELD_NAME = "connectionTimeout";
    
        public static final String READ_TIMEOUT_FIELD_NAME = "readTimeout";
    
        public static final int CONNECTION_TIMEOUT = 5000;
    
        public static final int READ_TIMEOUT = 10000;
    
        public static final String USERNAME = "username";
    
        public static final String PASSWORD = "password";
    
        public static final String HOST = "host";
    
        public static final int PORT = 22;
    
    }

    2. 编写URLListEntityProcessor.java类,用于循环遍历多url文件.

    package org.apache.solr.handler.dataimport;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    /**
     * 主要用于抽取多个文件内容.可是是本地主机也可以是远程主机上的文件
     */
    public class URLListEntityProcessor extends EntityProcessorBase {
      /**
       * 文件名字符串
       */
      protected String fileNames;
      /**
       * 文件名字符串分隔符
       */
      protected String regex;
    
      /**
       * data-config.xml中给定的基础目录
       */
      protected String baseDir;
    
      /**
       * The recursive given in data-config. Default value is false.
       */
      protected boolean recursive = false;
    
      @Override
      public void init(Context context) {
        super.init(context);
        fileNames = context.getEntityAttribute(FILE_NAMES);
        if (fileNames != null) {
            fileNames = context.replaceTokens(fileNames);
        }
        regex = context.getEntityAttribute(REGEX);
        if (regex != null) {
            regex = context.replaceTokens(regex);
        }
        baseDir = context.getEntityAttribute(BASE_DIR);
        if (baseDir == null)
          throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
                  "'baseDir' is a required attribute");
        baseDir = context.replaceTokens(baseDir);
        
        String r = context.getEntityAttribute(RECURSIVE);
        if (r != null)
          recursive = Boolean.parseBoolean(r);
      }
    
      @Override
      public Map<String, Object> nextRow() {
        if (rowIterator != null)
          return getNext();
        List<Map<String, Object>> fileDetails = new ArrayList<Map<String, Object>>();
        getUrls(fileDetails);
        rowIterator = fileDetails.iterator();
        return getNext();
      }
    
      private void getUrls(final List<Map<String, Object>> fileDetails) {
          String[] names = fileNames.split(regex);
          for(String name : names){
              Map<String, Object> details = new HashMap<String, Object>();
              details.put(FILE_NAME, baseDir+name);
              fileDetails.add(details);
          }
      }
    
      public static final String DIR = "fileDir";
    
      public static final String ABSOLUTE_FILE = "fileAbsolutePath";
    
      public static final String FILE_NAME = "fileName";
      
      public static final String FILE_NAMES = "fileNames";
    
      public static final String BASE_DIR = "baseDir";
    
      public static final String REGEX = "regex";
    
      public static final String RECURSIVE = "recursive";
    
    }

    3.配置data-config.xml文件:

    <dataConfig>
    <dataSource name="jdbc" driver="oracle.jdbc.driver.OracleDriver"
        url="jdbc:oracle:thin:@127.0.0.1:1522:ORCLLI" user="kms_iep" password="kms_iep" batchSize="2000"/>
    <dataSource name="binSftp" type="BinSFTPDataSource" 
            username="kms" password="kms" host="127.0.0.1"
            connectionTimeout="10000" readTimeout="20000" />
        <document>
            <entity  pk="ID"  dataSource="jdbc" name="province"
                query="select (provincecode || '_' || kng_id) as id,
                       kng_id,
                       kng_type as type,
                       kng_title as title,
                       provincecode,
                       opertime,
                       modify_date,
                       url,
                       pack_month_fee,
                       pack_type,
                       pack_sen_flow,
                       filename_html,
                       ('/kmsinterface/jt/province_bak/' || provincecode || '/' ||
                       to_char(opertime, 'yyyymmdd') || substr(filepath,instr(filepath,'/',2)) || '/'
                       ) as path,
                       ('/kmsinterface/' || provincecode ||filepath) as bakpath
                  from IEP_UPLOAD_DOCUMENT t
                 where kng_status = 0  and provincecode='ah'  and to_char(opertime,'yyyy-mm-dd')='2014-12-24'
                       "
                deltaQuery="select (provincecode || '_' || kng_id) as id  from IEP_UPLOAD_DOCUMENT  where kng_status = 0  and  opertime &gt; to_date('${dih.last_index_time}','yyyy-mm-dd hh24:mi:ss') order by opertime asc"
                deltaImportQuery="select * from (
                                select (provincecode || '_' || kng_id) as id,
                                       kng_id,
                                       kng_type as type,
                                       kng_title as title,
                                       provincecode,
                                       opertime,
                                       modify_date,
                                       url,
                                       pack_month_fee,
                                       pack_type,
                                       pack_sen_flow,
                                       filename_html,
                                       ('/kmsinterface/jt/province_bak/' || provincecode || '/' ||
                                       to_char(opertime, 'yyyymmdd') || substr(filepath,instr(filepath,'/',2)) || '/' ) as path,
                                       ('/kmsinterface/' || provincecode ||filepath) as bakpath
                                  from IEP_UPLOAD_DOCUMENT t
                                 where kng_status = 0 )
                                    where id = '${dih.delta.ID}'"
                deletePKQuery="select (provincecode || '_' || kng_id) as id  from IEP_UPLOAD_DOCUMENT  where kng_status = 1   and opertime &gt; to_date('${dih.last_index_time}','yyyy-mm-dd hh24:mi:ss') order by id desc"
                transformer="DateFormatTransformer,RegexTransformer"
                onError="skip">
                <field column="ID" name="id" />
                <field column="KNG_ID" name="kng_id" />
                <field column="type" name="type" />
                <field column="TITLE" name="title" />
                <field column="PROVINCECODE" name="provincecode" />
                <field column="OPERTIME" name="opertime" dateTimeFormat="yyyy-MM-dd HH:mm:ss"/>
                <field column="MODIFY_DATE" name="modify_date" dateTimeFormat="yyyy-MM-dd HH:mm:ss"/>
                <field column="URL" name="url"  />
                <field column="PACK_MONTH_FEE" name="pack_month_fee"  />
                <field column="PACK_TYPE" name="pack_type"  />
                <field column="PACK_SEN_FLOW" name="pack_sen_flow"  />
                <entity name="urllist1" processor="URLListEntityProcessor"  
                        baseDir="/kms/solr${province.PATH}" 
                        fileNames="${province.FILENAME_HTML}" regex=",">
                
                    <!--解析附件-->
                    <entity name="test1" processor="TikaEntityProcessor" url="${urllist1.fileName}"
                            dataSource="url"      format="text" 
                            transformer="HTMLStripTransformer,RegexTransformer" onError="skip">
                            <field column="text" name="content" stripHTML="true" regex="	|
    |
    |s"  replaceWith="" />
                    </entity>
                </entity>
                
                <entity name="urllist2" processor="URLListEntityProcessor"  
                        baseDir="/kms/solr${province.BAKPATH}" 
                        fileNames="${province.FILENAME_HTML}" regex=",">
    
                    <entity name="test2" processor="TikaEntityProcessor" url="${urllist2.fileName}" 
                            dataSource="url"  format="text" 
                            transformer="HTMLStripTransformer,RegexTransformer" onError="skip">
                            <field column="text" name="content" stripHTML="true" regex="	|
    |
    |s"  replaceWith="" />
                    </entity>
                </entity>
                
            </entity>
        </document>
    </dataConfig>
  • 相关阅读:
    Gridview鼠标移动到数据行时改变该数据行的背景色
    利用SQL或存储过程实现GridView分页
    ObjectDataSource“odbList”未能找到接受“MyBookShop.Model.Admin”类型的参数的非泛型方法“DeleteAdmin”。
    Apache服务器主配置文件全中文注释
    MVC简介
    MOSS 2007 部署Feature脚本
    在线视频点播
    yui 与 jQuery
    解决MOSS2007 单点登陆 设置错误“您没有执行此操作的权限”
    【转发】外企面试常用的英文问题及高分回答
  • 原文地址:https://www.cnblogs.com/a198720/p/4269154.html
Copyright © 2011-2022 走看看