zoukankan      html  css  js  c++  java
  • ElasticSearch IK热词自动热更新原理与Golang实现

    热更新概述

    ik分词器本身可以从配置文件加载扩张词库,也可以从远程HTTP服务器加载。
    本地加载,则需要重启ES生效,影响比较大。所以,一般我们都会把词库放在远程服务器上。这里主要有2种方式:
    1. 借助Nginx,在其某个目录结构下放一个dic.txt,我们只要更新这个文件,不需要重启ES也能达到热更新的目的。优点是简单,无需开发,缺点就是不够灵活。
    2. 自己开发一个HTTP接口,返回词库。注意:一行代表一个词,http body中,自己追加 换行
    这里主要介绍第2种接口方式。 

    热更新原理

    查看ik分词器源码(org.wltea.analyzer.dic.Monitor): 
    /**
    	 * 监控流程:
    	 *  ①向词库服务器发送Head请求
    	 *  ②从响应中获取Last-Modify、ETags字段值,判断是否变化
    	 *  ③如果未变化,休眠1min,返回第①步
    	 * 	④如果有变化,重新加载词典
    	 *  ⑤休眠1min,返回第①步
    	 */
    
    	public void runUnprivileged() {
    		//超时设置
    		RequestConfig rc = RequestConfig.custom().setConnectionRequestTimeout(10*1000)
    				.setConnectTimeout(10*1000).setSocketTimeout(15*1000).build();
    
    		HttpHead head = new HttpHead(location);
    		head.setConfig(rc);
    
    		//设置请求头
    		if (last_modified != null) {
    			head.setHeader("If-Modified-Since", last_modified);
    		}
    		if (eTags != null) {
    			head.setHeader("If-None-Match", eTags);
    		}
    
    		CloseableHttpResponse response = null;
    		try {
    			response = httpclient.execute(head);
    
    			//返回200 才做操作
    			if(response.getStatusLine().getStatusCode()==200){
    
    				if (((response.getLastHeader("Last-Modified")!=null) && !response.getLastHeader("Last-Modified").getValue().equalsIgnoreCase(last_modified))
    						||((response.getLastHeader("ETag")!=null) && !response.getLastHeader("ETag").getValue().equalsIgnoreCase(eTags))) {
    
    					// 远程词库有更新,需要重新加载词典,并修改last_modified,eTags
    					Dictionary.getSingleton().reLoadMainDict();
    					last_modified = response.getLastHeader("Last-Modified")==null?null:response.getLastHeader("Last-Modified").getValue();
    					eTags = response.getLastHeader("ETag")==null?null:response.getLastHeader("ETag").getValue();
    				}
    			}else if (response.getStatusLine().getStatusCode()==304) {
    				//没有修改,不做操作
    				//noop
    			}else{
    				logger.info("remote_ext_dict {} return bad code {}" , location , response.getStatusLine().getStatusCode() );
    			}
    		} catch (Exception e) {
    			logger.error("remote_ext_dict {} error!",e , location);
    		}finally{
    			try {
    				if (response != null) {
    					response.close();
    				}
    			} catch (IOException e) {
    				logger.error(e.getMessage(), e);
    			}
    		}
        }
    

      

    我们看到,每隔1分钟:
    1. 先发送Http HEAD请求,获取Last-Modified、ETag(里面都是字符串)
    2. 如果其中有一个变化,则继续发送Get请求,获取词库内容。
    所以,Golang里面 同一个URL 要同时处理 HEAD 请求 和 Get请求。 
     

    HEAD 格式

    HEAD方法跟GET方法相同,只不过服务器响应时不会返回消息体。一个HEAD请求的响应中,HTTP头中包含的元信息应该和一个GET请求的响应消息相同。这种方法可以用来获取请求中隐含的元信息,而不用传输实体本身。也经常用来测试超链接的有效性、可用性和最近的修改。

    一个HEAD请求的响应可被缓存,也就是说,响应中的信息可能用来更新之前缓存的实体。如果当前实体跟缓存实体的阈值不同(可通过Content-Length、Content-MD5、ETag或Last-Modified的变化来表明),那么这个缓存就被视为过期了。

    在ik分词器中,服务端返回的一个示例如下: 
    $ curl --head http://127.0.0.1:9800/es/steelDict
    HTTP/1.1 200 OK
    Etag: DefaultTags
    Last-Modified: 2021-10-15 14:49:35
    Date: Fri, 15 Oct 2021 07:23:15 GMT
    

      

    GET 格式

    • 返回词库时,Content-Length、charset=UTF-8一定要有。
    • Last-Modified和Etag 只需要1个有变化即可。只有当HEAD请求返回时,这2个其中一个字段的值变了,才会发送GET请求获取内容,请注意!
    • 一行代表一个词,自己追加 换行
    $ curl -i http://127.0.0.1:9800/es/steelDict
    HTTP/1.1 200 OK
    Content-Length: 130
    Content-Type: text/html;charset=UTF-8
    Etag: DefaultTags
    Last-Modified: 2021-10-15 14:49:35
    Date: Fri, 15 Oct 2021 07:37:47 GMT
    
    装饰管
    装饰板
    圆钢
    无缝管
    无缝方管
    卫生级无缝管
    卫生级焊管
    热轧中厚板
    热轧平板
    热轧卷平板
    

      

    实现

    配置ES IK分词器 

    # 这里以centos 7为例,通过rpm安装
    $ vim /usr/share/elasticsearch/plugins/ik/config/IKAnalyzer.cfg.xml
    # 改这一行,换成我们的地址
    <entry key="remote_ext_dict">http://10.16.52.52:9800/es/steelDict</entry>
    $ systemctl restart elasticsearch # 重启es
    
    # 这里还可以实时看到日志,比较方便
    $ tail -f /var/log/elasticsearch/my-application.log
    [2021-10-15T15:02:31,448][INFO ][o.w.a.d.Monitor          ] [node-1] 获取远程词典成功,总数为:0
    [2021-10-15T15:02:31,952][INFO ][o.e.l.LicenseService     ] [node-1] license [3ca1dc7b-3722-40e5-916e-3b2093980b75] mode [basic] - valid
    [2021-10-15T15:02:31,962][INFO ][o.e.g.GatewayService     ] [node-1] recovered [1] indices into cluster_state
    [2021-10-15T15:02:32,812][INFO ][o.e.c.r.a.AllocationService] [node-1] Cluster health status changed from [RED] to [YELLOW] (reason: [shards started [[steel-category-mapping][2]] ...]).
    [2021-10-15T15:02:41,630][INFO ][o.w.a.d.Monitor          ] [node-1] 重新加载词典...
    [2021-10-15T15:02:41,631][INFO ][o.w.a.d.Monitor          ] [node-1] try load config from /etc/elasticsearch/analysis-ik/IKAnalyzer.cfg.xml
    [2021-10-15T15:02:41,631][INFO ][o.w.a.d.Monitor          ] [node-1] try load config from /usr/share/elasticsearch/plugins/ik/config/IKAnalyzer.cfg.xml
    [2021-10-15T15:02:41,886][INFO ][o.w.a.d.Monitor          ] [node-1] [Dict Loading] http://10.16.52.52:9800/es/steelDict
    [2021-10-15T15:02:43,958][INFO ][o.w.a.d.Monitor          ] [node-1] 获取远程词典成功,总数为:0
    [2021-10-15T15:02:43,959][INFO ][o.w.a.d.Monitor          ] [node-1] 重新加载词典完毕...
    

      

    Golang接口

    假设使用gin框架,初始化路由: 
    const (
    	kUrlSyncESIndex     = "/syncESIndex" // 同步钢材品名、材质、规格、产地、仓库到ES索引中
    	kUrlGetSteelHotDict = "/steelDict"   // 获取钢材字典(品材规产仓)
    )
    
    func InitRouter(router *gin.Engine) {
         // ...
      
    	esRouter := router.Group("es")
        // 同一个接口,根据head/get来决定是否返回数据部,避免宽带浪费
    	esRouter.HEAD(kUrlGetSteelHotDict, onHttpGetSteelHotDictHead) 
    	esRouter.GET(kUrlGetSteelHotDict, onHttpGetSteelHotDict)
      
        // ...
    }
    

      

    head请求处理: 
    // onHttpGetSteelHotDictHead 处理head请求,只有当Last-Modified 或 ETag 其中1个值改变时,才会出发GET请求获取词库列表
    func onHttpGetSteelHotDictHead(ctx *gin.Context) {
    	t, err := biz.QueryEsLastSyncTime()
    	if err != nil {
    		ctx.JSON(http.StatusOK, gin.H{
    			"code": biz.StatusError,
    			"msg":  "server internal error",
    		})
    		logger.Warn(err)
    		return
    	}
    	ctx.Header("Last-Modified", t)
    	ctx.Header("ETag", kDefaultTags)
    }
    

      

    Get请求处理: 
    // onHttpGetSteelHotDict 处理GET请求,返回真正的词库,每一行一个词
    func onHttpGetSteelHotDict(ctx *gin.Context) {
        // 这里从mysql查询词库,dic是一个[]string切片
    	dic, err := biz.QuerySteelHotDic()
    	if err != nil {
    		ctx.JSON(http.StatusOK, gin.H{
    			"code": biz.StatusError,
    			"msg":  "server internal error",
    		})
    		logger.Warn(err)
    		return
    	}
    
        // 这里查询最后一次更新时间,作为判断词库需要更新的标准
    	t, err := biz.QueryEsLastSyncTime()
    	if err != nil {
    		ctx.JSON(http.StatusOK, gin.H{
    			"code": biz.StatusError,
    			"msg":  "server internal error",
    		})
    		logger.Warn(err)
    		return
    	}
    
    	ctx.Header("Last-Modified", t)
    	ctx.Header("ETag", kDefaultTags)
    
    	body := ""
    	for _, v := range dic {
    		if v != "" {
    			body += v + "
    "
    		}
    	}
    	logger.Infof("%s query steel dict success, count = %d", ctx.Request.URL, len(dic))
    
    	buffer := []byte(body)
    	ctx.Header("Content-Length", strconv.Itoa(len(buffer)))
    	ctx.Data(http.StatusOK, "text/html;charset=UTF-8", buffer)
    }
    

      

    效果 

    分词效果: 
    POST http://10.0.56.153:9200/_analyze
    {
      "analyzer": "ik_smart",
      "text": "武钢 Q235B 3*1500*3000 6780 佰隆库 在途整件出"
    }
    
    {
        "tokens": [
            {
                "token": "武钢",
                "start_offset": 0,
                "end_offset": 2,
                "type": "CN_WORD",
                "position": 0
            },
            {
                "token": "q235b",
                "start_offset": 3,
                "end_offset": 8,
                "type": "CN_WORD",
                "position": 1
            },
            {
                "token": "3*1500*3000",
                "start_offset": 9,
                "end_offset": 20,
                "type": "ARABIC",
                "position": 2
            },
            {
                "token": "6780",
                "start_offset": 21,
                "end_offset": 25,
                "type": "ARABIC",
                "position": 3
            },
            {
                "token": "佰隆库",
                "start_offset": 26,
                "end_offset": 29,
                "type": "CN_WORD",
                "position": 4
            },
            {
                "token": "在途",
                "start_offset": 30,
                "end_offset": 32,
                "type": "CN_WORD",
                "position": 5
            },
            {
                "token": "整件",
                "start_offset": 32,
                "end_offset": 34,
                "type": "CN_WORD",
                "position": 6
            },
            {
                "token": "出",
                "start_offset": 34,
                "end_offset": 35,
                "type": "CN_CHAR",
                "position": 7
            }
        ]
    }
    

      

    重新加载后,每个词都会打印,如果嫌弃可以把代码注释掉: 
    /**
       * 加载远程扩展词典到主词库表
       */
      private void loadRemoteExtDict() {
          // ...
          for (String theWord : lists) {
            if (theWord != null && !"".equals(theWord.trim())) {
              // 加载扩展词典数据到主内存词典中
              // 注释这一行:
              // logger.info(theWord);
              _MainDict.fillSegment(theWord.trim().toLowerCase().toCharArray());
            }
          }
          // ...
      }
    

      

    然后运行: 
    mvn package
    

      

    生成zip目标包,拷贝到es目录或者替换 elasticsearch-analysis-ik-6.8.4.jar 即可。
    PS:如果要改ik源码,maven同步的时候,有些插件会找不到,直接删除即可,只需要保留下面一个: 

    后记

    调试接口不生效

    因为我们需要改ik分词器源码,当时做热更新的时候发现没有效果,于是在其代码中增加了一句日志: 
    /**
       * 加载远程扩展词典到主词库表
       */
      private void loadRemoteExtDict() {
        List<String> remoteExtDictFiles = getRemoteExtDictionarys();
        for (String location : remoteExtDictFiles) {
          logger.info("[Dict Loading] " + location);
          List<String> lists = getRemoteWords(location);
          // 如果找不到扩展的字典,则忽略
          if (lists == null) {
            logger.error("[Dict Loading] " + location + "加载失败");
            continue;
          } else {
            logger.info("获取远程词典成功,总数为:" + lists.size());
          }
          for (String theWord : lists) {
            if (theWord != null && !"".equals(theWord.trim())) {
              // 加载扩展词典数据到主内存词典中
              logger.info(theWord);
              _MainDict.fillSegment(theWord.trim().toLowerCase().toCharArray());
            }
          }
        }
      }
    

      

    发现输出了0: 
    [2021-10-15T15:02:41,886][INFO ][o.w.a.d.Monitor] [node-1] [Dict Loading] http://10.16.52.52:9800/es/steelDict
    [2021-10-15T15:02:43,958][INFO ][o.w.a.d.Monitor] [node-1] 获取远程词典成功,总数为:0
    [2021-10-15T15:02:43,959][INFO ][o.w.a.d.Monitor] [node-1] 重新加载词典完毕...
    

      

    后面通过运行(Dictionary.java): 
      public static void main(String[] args) {
        List<String> words = getRemoteWordsUnprivileged("http://127.0.0.1:9800/es/steelDict");
        System.out.println(words.size());
      }
    

      

    单点调试,发现HEADER中没有设置 Content-Length 导致解析失败。 

    数字分词如何把*号不过滤

    原生分词会把 3*1500*3000 分成:3 1500 3000。如果有特殊需要,希望不分开呢(在钢贸行业,这是一个规格,所以有这个需求)?
    修改代码,把识别数字的逻辑加一个 “*”即可。 
    /**
     * 英文字符及阿拉伯数字子分词器
     */
    class LetterSegmenter implements ISegmenter {
      // ...
      
      //链接符号(这里追加*号)
      private static final char[] Letter_Connector = new char[]{'#', '&', '+', '-', '.', '@', '_', '*'};
      //数字符号(这里追加*号)
      private static final char[] Num_Connector = new char[]{',', '.', '*'};
      
      // ...
    }
    

      

    关于作者

    推荐下自己的开源IM,纯Golang编写:

    CoffeeChat:https://github.com/xmcy0011/CoffeeChat
    opensource im with server(go) and client(flutter+swift)

    参考了TeamTalk、瓜子IM等知名项目,包含服务端(go)和客户端(flutter+swift),单聊和机器人(小微、图灵、思知)聊天功能已完成,目前正在研发群聊功能,欢迎对golang感兴趣的小伙伴Star加关注。
    在这里插入图片描述

  • 相关阅读:
    Caused by: org.xml.sax.SAXParseException; lineNumber: 102; columnNumber: 25; cvc-complex-type.2.4.b:
    org.apache.jasper.JasperException: The absolute uri: http://java.sun.com/jsp/jstl/core cannot be res
    Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Duplicate column name 'stuNo'
    java.net.UnknownHostException: you.hai.com
    ContextLoaderListener类(spring源码解析)
    perl DELETE 加请求头
    perl get请求加请求头
    perl PUT 请求加请求头
    perl post 请求加请求头
    Perl 模拟DELETE 请求
  • 原文地址:https://www.cnblogs.com/wishFreedom/p/15411847.html
Copyright © 2011-2022 走看看