zoukankan      html  css  js  c++  java
  • Elasticsearch 热更新词库 外置词库 mysql

    elasticsearch 虽然自带默认词库,但是在实际应用中对于词库灵活度的要求还是远远达不到的,elasticsearch 支持我们自定义词库,此文章就来讲一讲如何又快又好地对词库进行热更新

    热更新方案

    1.基于ik分词器原生的热更新方案,部署一个web服务器,提供一个http接口,通过modified和tag两个http响应头,来提供词语的热更新

    2.通过修改源码支持mysql定时拉取数据更新

    推荐使用第二种方式,也是比较常用的方式,虽然前第二种方式是官方方法,但是官方也不支持使用

    方案

    1.ik分词器原生方案

    • 外置静态词库

      • 优点:编辑指定分词文件即可,部署比较方便
      • 缺点:每次编辑之后都需要重启elasticsearch服务才能生效
    • 远程词库:

      • 优点:指定静态文件代理服务器设置词库
      • 缺点:需要通过modified和tag两个http响应头,来提供词语的热更新,有时候会不生效
    使用方法
    • 进入config文件夹下
    • 选择IKAnalyzer.cfg.xml 文件
    <?xml version="1.0" encoding="UTF-8"?>
    <!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
    <properties>
    	<comment>IK Analyzer 扩展配置</comment>
    	<!--用户可以在这里配置自己的扩展字典 -->
    	<entry key="ext_dict">community.dic</entry>
    	 <!--用户可以在这里配置自己的扩展停止词字典-->
    	<entry key="ext_stopwords"></entry>
    	<!--用户可以在这里配置远程扩展字典 -->
    	<!-- <entry key="remote_ext_dict">http://192.168.10.63:8080/community.txt</entry> -->
    	<!--用户可以在这里配置远程扩展停止词字典-->
    	<!-- <entry key="remote_ext_stopwords">http://192.168.10.63:8080/community.txt</entry> -->
    </properties>
    
    

    ext_dict(词库) 和 ext_stopwords(停止词库) 需要将文件放在config文件夹下才会生效,每次更新词库之后需要重启es;
    remote_ext_dict 和 remote_ext_stopwords 需要将文件放在静态服务器上,默认拉取时间间隔为 一分钟

    /**
    	 * 词典初始化 由于IK Analyzer的词典采用Dictionary类的静态方法进行词典初始化
    	 * 只有当Dictionary类被实际调用时,才会开始载入词典, 这将延长首次分词操作的时间 该方法提供了一个在应用加载阶段就初始化字典的手段
    	 *
    	 * @return Dictionary
    	 */
    	public static synchronized Dictionary initial(Configuration cfg) {
    		if (singleton == null) {
    			synchronized (Dictionary.class) {
    				if (singleton == null) {
    
    					singleton = new Dictionary(cfg);
    					singleton.loadMainDict();
    					singleton.loadSurnameDict();
    					singleton.loadQuantifierDict();
    					singleton.loadSuffixDict();
    					singleton.loadPrepDict();
    					singleton.loadStopWordDict();
    
                        // 执行更新词库的线程
                        new Thread(new HotDicReloadThread()).start();
    
    					if(cfg.isEnableRemoteDict()){
    						// 建立监控线程
    						for (String location : singleton.getRemoteExtDictionarys()) {
    							// 10 秒是初始延迟可以修改的 60是间隔时间 单位秒
    							pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);
    						}
    						for (String location : singleton.getRemoteExtStopWordDictionarys()) {
    							pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);
    						}
    					}
    
    					return singleton;
    				}
    			}
    		}
    		return singleton;
    	}
    
    

    2.通过mysql热更新词库

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>6.0.6</version>
    </dependency>
    
    • 在config 文件夹下创建一个新的.properties文件jdbc-reload.properties

    jdbc.url=jdbc:mysql://192.168.10.200:3316/platform_foreign_website?characterEncoding=UTF-8&serverTimezone=GMT&nullCatalogMeansCurrent=true
    jdbc.user=dbadmin_app
    jdbc.password=dap12345678@
    # 更新词库
    jdbc.reload.sql=select word from hot_words
    # 更新停用词词库
    jdbc.reload.stopword.sql=select stopword as word from hot_stopwords
    # 重新拉取时间间隔
    jdbc.reload.interval=5000
    
    • 创建新的线程,调用Dictionary得reLoadMainDict()方法重新加载词库

    /**
     * @description:
     * @author: liuxin
     * @create: 2019-11-14 16:24
     **/
    public class HotDicReloadThread implements Runnable{
        private static final Logger logger = ESLoggerFactory.getLogger(HotDicReloadThread.class.getName());
    
        @Override
        public void run() {
            while (true){
                logger.info("-------重新加载mysql词典--------");
                
                Dictionary.getSingleton().reLoadMainDict();
            }
        }
    }
    
    • 修改org.wltea.analyzer.dic文件夹下的Dictionary

      • 创建加载词典方法
      /**
       * 从mysql中加载热更新词典
       */
      private void loadMySqlExtDict(){
          Connection connection = null;
          Statement statement = null;
          ResultSet resultSet = null;
      
          try {
              Path file = PathUtils.get(getDictRoot(),"jdbc-reload.properties");
              prop.load(new FileInputStream(file.toFile()));
      
              logger.info("-------jdbc-reload.properties-------");
              for (Object key : prop.keySet()) {
                  logger.info("key:{}", prop.getProperty(String.valueOf(key)));
              }
      
              logger.info("------- 查询词典, sql:{}-------", prop.getProperty("jdbc.reload.sql"));
      
              // 建立mysql连接
              connection = DriverManager.getConnection(
                      prop.getProperty("jdbc.url"),
                      prop.getProperty("jdbc.user"),
                      prop.getProperty("jdbc.password")
              );
      
              // 执行查询
              statement = connection.createStatement();
              resultSet = statement.executeQuery(prop.getProperty("jdbc.reload.sql"));
      
              // 循环输出查询啊结果,添加到Main.dict中去
              while (resultSet.next()) {
                  String theWord = resultSet.getString("word");
                  logger.info("------热更新词典:{}------", theWord);
      
                  // 加到mainDict里面
                  _MainDict.fillSegment(theWord.trim().toCharArray());
              }
          } catch (Exception e) {
              logger.error("error:{}", e);
          } finally {
              try {
                  if (resultSet != null) {
                      resultSet.close();
                  }
                  if (statement != null) {
                      statement.close();
                  }
                  if (connection != null) {
                      connection.close();
                  }
              } catch (SQLException e){
                  logger.error("error", e);
              }
          }
      }
      
      
      • 创建加载停用词词典方法
      /**
       * 从mysql中加载停用词
       */
      private void loadMySqlStopwordDict(){
          Connection conn = null;
          Statement stmt = null;
          ResultSet rs = null;
      
          try {
              Path file = PathUtils.get(getDictRoot(), "jdbc-reload.properties");
              prop.load(new FileInputStream(file.toFile()));
      
              logger.info("-------jdbc-reload.properties-------");
              for(Object key : prop.keySet()) {
                  logger.info("-------key:{}", prop.getProperty(String.valueOf(key)));
              }
      
              logger.info("-------查询停用词, sql:{}",props.getProperty("jdbc.reload.stopword.sql"));
      
              conn = DriverManager.getConnection(
                      prop.getProperty("jdbc.url"),
                      prop.getProperty("jdbc.user"),
                      prop.getProperty("jdbc.password"));
              stmt = conn.createStatement();
              rs = stmt.executeQuery(prop.getProperty("jdbc.reload.stopword.sql"));
      
              while(rs.next()) {
                  String theWord = rs.getString("word");
                  logger.info("------- 加载停用词 : {}", theWord);
                  _StopWords.fillSegment(theWord.trim().toCharArray());
              }
      
              Thread.sleep(Integer.valueOf(String.valueOf(prop.get("jdbc.reload.interval"))));
          } catch (Exception e) {
              logger.error("error", e);
          } finally {
              try {
                  if(rs != null) {
                      rs.close();
                  }
                  if(stmt != null) {
                      stmt.close();
                  }
                  if(conn != null) {
                      conn.close();
                  }
              } catch (SQLException e){
                  logger.error("error:{}", e);
              }
      
          }
      }
      
    • 分别在 loadMainDict()方法和结尾 loadStopWordDict()方法结尾用

    /**
    	 * 加载主词典及扩展词典
    	 */
    	private void loadMainDict() {
    		// 建立一个主词典实例
    		_MainDict = new DictSegment((char) 0);
    
    		// 读取主词典文件
    		Path file = PathUtils.get(getDictRoot(), Dictionary.PATH_DIC_MAIN);
    
    		InputStream is = null;
    		try {
    			is = new FileInputStream(file.toFile());
    		} catch (FileNotFoundException e) {
    			logger.error(e.getMessage(), e);
    		}
    
    		try {
    			BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"), 512);
    			String theWord = null;
    			do {
    				theWord = br.readLine();
    				if (theWord != null && !"".equals(theWord.trim())) {
    					_MainDict.fillSegment(theWord.trim().toCharArray());
    				}
    			} while (theWord != null);
    
    		} catch (IOException e) {
    			logger.error("ik-analyzer", e);
    
    		} finally {
    			try {
    				if (is != null) {
    					is.close();
    					is = null;
    				}
    			} catch (IOException e) {
    				logger.error("ik-analyzer", e);
    			}
    		}
    		// 加载扩展词典
    		this.loadExtDict();
    		// 加载远程自定义词库
    		this.loadRemoteExtDict();
            //自定义sql方法
    		this.loadMySqlExtDict();
    	}
    	
    	
    	
    	
    	
    	/**
    	 * 加载用户扩展的停止词词典
    	 */
    	private void loadStopWordDict() {
    		// 建立主词典实例
    		_StopWords = new DictSegment((char) 0);
    
    		// 读取主词典文件
    		Path file = PathUtils.get(getDictRoot(), Dictionary.PATH_DIC_STOP);
    
    		InputStream is = null;
    		try {
    			is = new FileInputStream(file.toFile());
    		} catch (FileNotFoundException e) {
    			logger.error(e.getMessage(), e);
    		}
    
    		try {
    			BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"), 512);
    			String theWord = null;
    			do {
    				theWord = br.readLine();
    				if (theWord != null && !"".equals(theWord.trim())) {
    					_StopWords.fillSegment(theWord.trim().toCharArray());
    				}
    			} while (theWord != null);
    
    		} catch (IOException e) {
    			logger.error("ik-analyzer", e);
    
    		} finally {
    			try {
    				if (is != null) {
    					is.close();
    					is = null;
    				}
    			} catch (IOException e) {
    				logger.error("ik-analyzer", e);
    			}
    		}
    
    		// 加载扩展停止词典
    		List<String> extStopWordDictFiles = getExtStopWordDictionarys();
    		if (extStopWordDictFiles != null) {
    			is = null;
    			for (String extStopWordDictName : extStopWordDictFiles) {
    				logger.info("[Dict Loading] " + extStopWordDictName);
    
    				// 读取扩展词典文件
    				file = PathUtils.get(getDictRoot(), extStopWordDictName);
    				try {
    					is = new FileInputStream(file.toFile());
    				} catch (FileNotFoundException e) {
    					logger.error("ik-analyzer", e);
    				}
    				// 如果找不到扩展的字典,则忽略
    				if (is == null) {
    					continue;
    				}
    				try {
    					BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"), 512);
    					String theWord = null;
    					do {
    						theWord = br.readLine();
    						if (theWord != null && !"".equals(theWord.trim())) {
    							// 加载扩展停止词典数据到内存中
    							_StopWords.fillSegment(theWord.trim().toCharArray());
    						}
    					} while (theWord != null);
    
    				} catch (IOException e) {
    					logger.error("ik-analyzer", e);
    
    				} finally {
    					try {
    						if (is != null) {
    							is.close();
    							is = null;
    						}
    					} catch (IOException e) {
    						logger.error("ik-analyzer", e);
    					}
    				}
    			}
    		}
    
    		// 加载远程停用词典
    		List<String> remoteExtStopWordDictFiles = getRemoteExtStopWordDictionarys();
    		for (String location : remoteExtStopWordDictFiles) {
    			logger.info("[Dict Loading] " + location);
    			List<String> lists = getRemoteWords(location);
    			// 如果找不到扩展的字典,则忽略
    			if (lists == null) {
    				logger.error("[Dict Loading] " + location + "加载失败");
    				continue;
    			}
    			for (String theWord : lists) {
    				if (theWord != null && !"".equals(theWord.trim())) {
    					// 加载远程词典数据到主内存中
    					logger.info(theWord);
    					_StopWords.fillSegment(theWord.trim().toLowerCase().toCharArray());
    				}
    			}
    		}
    		
            //自定义sql方法
    		this.loadMySqlStopwordDict();
    	}
    
    • 在Dictionary 中如mysql驱动类
    // prop用来获取上面的properties配置文件
    private static Properties prop = new Properties();
    
    static {
    	try {
    		Class.forName("com.mysql.jdbc.Driver");
    	} catch (ClassNotFoundException e) {
    		logger.error("error", e);
    	}
    }
    
    • 在Dictionary 中的 initial()方法中启动更新线程
    /**
    	 * 词典初始化 由于IK Analyzer的词典采用Dictionary类的静态方法进行词典初始化
    	 * 只有当Dictionary类被实际调用时,才会开始载入词典, 这将延长首次分词操作的时间 该方法提供了一个在应用加载阶段就初始化字典的手段
    	 *
    	 * @return Dictionary
    	 */
    	public static synchronized Dictionary initial(Configuration cfg) {
    		if (singleton == null) {
    			synchronized (Dictionary.class) {
    				if (singleton == null) {
    
    					singleton = new Dictionary(cfg);
    					singleton.loadMainDict();
    					singleton.loadSurnameDict();
    					singleton.loadQuantifierDict();
    					singleton.loadSuffixDict();
    					singleton.loadPrepDict();
    					singleton.loadStopWordDict();
    
                        // 执行更新mysql词库的线程
                        new Thread(new HotDicReloadThread()).start();
    
    					if(cfg.isEnableRemoteDict()){
    						// 建立监控线程
    						for (String location : singleton.getRemoteExtDictionarys()) {
    							// 10 秒是初始延迟可以修改的 60是间隔时间 单位秒
    							pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);
    						}
    						for (String location : singleton.getRemoteExtStopWordDictionarys()) {
    							pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);
    						}
    					}
    
    					return singleton;
    				}
    			}
    		}
    		return singleton;
    	}
    
    
    • 修改src/main/assemblies/plugin.xml文件

            <dependencySet>
                <outputDirectory>/</outputDirectory>
                <useProjectArtifact>true</useProjectArtifact>
                <useTransitiveFiltering>true</useTransitiveFiltering>
                <includes>
                    <include>mysql:mysql-connector-java</include>
                </includes>
            </dependencySet>
    
    • 源码已经修改完成,在自己的数据库中创建两张新的表

    CREATE TABLE hot_words (
    id bigint(20) NOT NULL AUTO_INCREMENT,
    word varchar(50) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '词语',
    PRIMARY KEY (id)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;

    CREATE TABLE hot_stopwords (
    id bigint(20) NOT NULL AUTO_INCREMENT,
    stopword varchar(50) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '停用词',
    PRIMARY KEY (id)
    ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;

    打包项目: mvn package,执行完之后会在elasticsearch-analysis-ik arget eleases文件夹下生成一个新的 elasticsearch-analysis-ik-5.6.9.zip 压缩包,解压之后将elasticsearch 和 mysql-connector-java-6.0.6.jar 都拷贝到es文件中pluginsik文件夹下即可

  • 相关阅读:
    QuartzQuartz定时任务
    jdbc模糊查询、分页查询、联合查询
    PreparedStatement
    web服务器简述
    JDBC基本操作
    RMI
    Http编程
    2020毕业季业务开发宝典
    程序设计流程图
    系统概要框图
  • 原文地址:https://www.cnblogs.com/xiaoxiaoliu/p/11218109.html
Copyright © 2011-2022 走看看