zoukankan      html  css  js  c++  java
  • ES Client

    关于 ElasticSearch的学习参见:ELK | wjcx_sqh 本文分别学习 .Net | Java 下操作 ES:

    .Net

    目前主流的 .Net 客户端有 2 种:

    • PlainElastic.Net
    • Elasticsearch.Net.dll 和 Nest.dll

    PlainElastic.Net 简单了解即可,具体参见:https://www.cnblogs.com/eggTwo/p/4039779.html

    ##Elasticsearch.Net + Nest 直接在Nuget | 官网下载对应的.nuget包,项目中引入即可。注意不同版本.dll对.Net框架的依赖

    ###索引 创建连接(连接池

    var nodes = new Uri[] { new Uri("http://ip1:9200"), new Uri("http://ip2:9200") };
    var pool = new StaticConnectionPool(nodes); //推荐使用
    var settings = new ConnectionSettings(pool);
    var ESClient = new ElasticClient(settings);
    

    其他配置参见:

    settings.DefaultIndex("defaultidx"); //指定默认索引
    settings.DefaultFieldNameInferrer((name) => name) //与Model字段同名,避免出现字段不一致的情况
    settings.BasicAuthentication("username", "password"); //用户认证
    settings.RequestTimeout(new TimeSpan(10000)); //请求超时设置
    settings.MaximumRetries(2); //最大重试次数
    settings.MaxRetryTimeout(new TimeSpan(50000)); //重试超时时间, 默认是RequestTimeout
    settings.DisableDirectStreaming(true); //开启debug调试,生产环境建议关闭
    

    配置回调方法

    settings.OnRequestCompleted(apiCallDetails => { //请求完成 返回 apiCallDetails });
    settings.OnRequestDataCreated(requestData => { //请求的数据创建完成 返回请求的数据 });
    

    索引判断和创建

    var descriptor = new CreateIndexDescriptor("idxName").Settings(s => s.NumberOfShards(5).NumberOfReplicas(1));
    client.CreateIndex(descriptor);
    
    if (!client.TypeExists(_indexName, _typeName).Exists) {
    	client.CreateIndex(_indexName, p => p.InitializeUsing(_indexState)
    	      .Mappings(m => m.Map<DefClassName>(mp => mp.AutoMap()))); }	  
    protected static IIndexState _indexState = new IndexState() { //索引配置
    	Settings = new IndexSettings() {
    		NumberOfReplicas = 1, NumberOfShards = 5
    	}};
    
    client.IndexExists("index_name"); //判断
    client.DeleteIndex("index_name"); //删除
    

    一旦mapping原字段不能再修改,需删除再重新创建。其中DefClassName:

    [ElasticsearchType(Name = "TYPE_NAME")]
    public class AFVInfo {
    	[Keyword(Name = "field1", Index = true, IgnoreAbove = 20)]
    	public string field1 { get; set; }
    
    	[Text(Name = "field2",Index= false)]
    	public string field2 { get; set; }
    	
            public DateTime dt{ get; set; }
    }
    

    通过配置各字段的KeywordText属性即可完成创建索引时的映射。Elasticsearch.net client NEST 但是,可以新增其他字段

    var result = _client.Map<AFVInfo>(m => m.Index(indexName).Properties(p => p
      .Keyword(s => s.Name("field_name1").Index(true))
      .Text(s=>s.Name("field_name2").Index(false))
    ));
    

    调试:获取ES交互时的请求和响应

    var requestStr = System.Text.Encoding.UTF8.GetString(result.ApiCall.RequestBodyInBytes);
    var responseStr = System.Text.Encoding.UTF8.GetString(result.ApiCall.ResponseBodyInBytes);
    

    ###查询 对象方式查询和Fluent API TermQuery 是整词搜索;MatchQuery 是按分词器分词搜索,可以搭配from和size从指定位置返回指定条数。 注意 match 与 match_phrase 的不同。

    Search After 通过上一页的结果检索下一页,使用search_after参数时,from的值必须设为0或-1:search_after

    • from和size:深度分页或size特别大时,会出deep pagination,es自保机制max_result_window预设值10000,建议from + size <= 1万
    • scroll:代表某时刻的snapshot,不适合实时查询,scroll后接超时时间,频繁发起scroll请求,也会出现一系列问题

    search_after解决scroll的非实时取值问题,提供live cursor规避消耗存储和时间的性能问题:search_after性能 Source Filter 推荐在查询请求SearchRequest中使用,按需返回字段

    sr.Source = new SourceFilter() {
       Includes = new string[] { "xx", "zz" }, Excludes = new string[] { "vv" }
    };
    

    fielddata 延迟加载-->内存控制:正排索引,列式存储(驱逐线、断路器)

    //配置示例 elasticsearch.yml
    indices.fielddata.cache.size: 20%
    indices.breaker.total.limit:indices.breaker.fielddata.limit + indices.breaker.request.limit
    

    内存使用监控

    GET /_stats/fielddata?fields=* //各个分片、索引的fielddata在内存中的占用情况
    GET /_nodes/stats/indices/fielddata?fields=* //每个node的fielddata在内存中的占用情况
    GET /_nodes/stats/indices/fielddata?level=indices&fields=* //每个node中的每个索引的fielddata在内存中的占用情况
    

    _explain和_analyze _explain:帮助分析文档的relevance score如何计算出来 _analyze:帮助分析每个field或某个analyzer/tokenizer如何分析和索引一段文字

    ##工具类 首先是初始化部分

    public class NestEsUtil<T> where T : class {
        private readonly List<Uri> _nodes = new List<Uri>();
        private readonly string _indexName;
        private readonly ElasticClient _elasticClient;
    
        public static ElasticClient GetClient() {  //var client =  NestEsUtil<ModelVo>.GetClient();
            string indexName = typeof(T).Name;
            var nodes = new Uri[] { new Uri(""), new Uri("") };
            var _ConnectionPool = new StaticConnectionPool(nodes);
            var _ConnectionConfig = new ConnectionSettings(_ConnectionPool, sourceSerializer: (builtin, settingss) => new JsonNetSerializer());
            var _ElasticClient = new ElasticClient(_ConnectionConfig.DefaultIndex(indexName).DefaultFieldNameInferrer((name) => name));
            return _ElasticClient;
        }
    
        public NestEsUtil(string indexName, string ips) {  //NestEsUtil esUtil = new NestEsUtil("xxx", "ips");
            this._indexName = indexName;
            foreach (var ip in ips.Split(';')) { _nodes.Add(new Uri("http://" + ip + "/")); }
            var _ConnectionPool = new StaticConnectionPool(_nodes);
            var _ConnectionConfig = new ConnectionSettings(_ConnectionPool, sourceSerializer: (builtin, settingss) => new JsonNetSerializer());
            this._elasticClient = new ElasticClient(_ConnectionConfig.DefaultIndex(this._indexName).DefaultFieldNameInferrer((name) => name));
        }
        ///以ip:port;ip:port;ip:port格式送值
        public NestEsUtil(string ips) : this(typeof(T).Name, ips) { } //获取类名
    }
    

    插入

    public void InsertOne(T data) {
        var result = _elasticClient.Index<T>(data, s => s.Index(_indexName));
    }
    

    删除

    public void DeleteOne(T data) {
        dynamic d = data;
        _elasticClient.DeleteByQuery<T>(s => s.Index(this._indexName)
            .Query(q => q.Term(tm => tm.Field(fd => fd.GetType().GetProperty("PhoneNumber").Name).Value(d.PhoneNumber))));
    }
    

    查询

    
    

    修改/更新

    
    

    对于分页、聚合、分组、扫描滚屏等,待学习の... Elasticsearch工具类清单

    ###问题解决 问题1:Kibana显示的时间比实际插入ES的时间多8个小时 原因:NEST的序列化器默认DateTime类型是UTC时区,序列化时丢弃了时区信息,而Kibana设置是东八区 解决:创建client时传入设置参数

    var settings = new ConnectionSettings(pool, 
    	sourceSerializer: (builtin, setting) => new JsonNetSerializer(builtin, setting, 
    		() => new Newtonsoft.Json.JsonSerializerSettings { 
    			DateTimeZoneHandling = Newtonsoft.Json.DateTimeZoneHandling.Local 
    }));
    

    问题2:实现超时自动重试 解决:通过添加max_retries和retry_on_timeout两个参数

    es = Elasticsearch( hosts=[{'host': 'localhost', 'port': 9200}], timeout=60, max_retries=3, retry_on_timeout=True);
    

    ES连接写入信息报错:java.net.ConnectException: Connection refused: no further information

    ElasticsearchStatusException[Elasticsearch exception [type=cluster_block_exception, reason=blocked by: [FORBIDDEN/12/index read-only / allow delete (api)];]]
    

    原因:当ES数据所在目录磁盘空间使用率超过90%后,ES将修改为只读状态,官方高铁直达 解决:直接在DevTools中执行即可,参考 curl -XPUT -H "Content-Type: application/json" http://127.0.0.1:9200/_all/_settings -d '{"index.blocks.read_only_allow_delete": null}'

    ###序列化 ElasticSearch是Restful相关,自然经常用到 json 推荐学习:Custom Serialization 说到json,自然需要解析,提供2种方式:JsonPathLinq to JSON

    ##log4net.ElasticSearch 除了调用ES对外的接口,还可以直接向ES写日志:log4net.ElasticSearch

    <appender name="ElasticSearchAppender" type="log4net.ElasticSearch.ElasticSearchAppender, log4net.ElasticSearch">
      <layout type="log4net.Layout.PatternLayout,log4net">
    	<param name="ConversionPattern" value="%d{ABSOLUTE} %-5p %c{1}:%L - %m%n" />
      </layout>
      <connectionString value="Server=xxx.xxx.xxx.xxx;Index=logsqh;Port=9200;rolling=false"/>
      <lossy value="false" />
      <evaluator type="log4net.Core.LevelEvaluator">
    	<threshold value="ALL" />
      </evaluator>
      <bufferSize value="1" />
    </appender>
    

    其中,rolling属性控制是否每天生成一个索引,具体参见:log4net.ElasticSearch+ Kibana日志记录和显示

    #Java ##原生API

    • transport:TCP,只支持java
    • rest:http,无语言限制

    建议rest,transport将在v7.0、v8.0中逐步废弃。 ##SpringBoot + Elasticsearch SpringBoot集成Elasticsearch,支持4种方式

    • REST Client:http,Java Low Level Rest Client和推荐:Java High Level Rest Client
    • Jest:http,java社区版
    • Spring Data:spring集成elasticsearch开发包
    • Spring Data Repositories

    ###Spring Data Elasticsearch Spring Data 子模块套件,支持快速初始化maven项,官网移步参考示例

    • SpringBoot:v2.2.2
    • Elasticsearch:v6.8.0

    务必保证SpringBoot和Elasticsearch的版本匹配,对应关系

    ###High Level Rest Client 版本配置:sb-2.2.2 + es-6.6.2

    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>6.6.2</version>
    

    初始引用v6.1.4会报错:org.elasticsearch.client.Request.<init>(Ljava/lang/String;Ljava/lang/String;)V 提供一个ES的客户端配置示例

    private String schema = "http";
    private int connectTimeOut = 1000;
    private int socketTimeOut = 30000;
    private int connectionRequestTimeOut = 500;
    private int maxConnectNum = 100;
    private int maxConnectPerRoute = 100;
    private boolean uniqueConnectTimeConfig = true;
    private boolean uniqueConnectNumConfig = true;
    private RestClientBuilder builder;
    private RestHighLevelClient client;
    
    List<HttpHost> httpHosts = new ArrayList<>();
    HttpHost it = new HttpHost(host, port, schema);
    httpHosts.add(it);
    
    @Bean(autowire = Autowire.BY_NAME, name = "restHighLevelClient")
    public RestHighLevelClient client() {
    	try {
    		builder = RestClient.builder(httpHosts.toArray(new HttpHost[0]));
    		if (uniqueConnectTimeConfig) { setConnectTimeOutConfig(); }
    		if (uniqueConnectNumConfig) { setMutiConnectConfig(); }
    		client = new RestHighLevelClient(builder);
    		return client;
    	} catch (NumberFormatException e) { }
    	return null;
    }
    
    /**
     * 异步httpclient的连接延时配置
     */
    public void setConnectTimeOutConfig() {
    	builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
    		@Override
    		public Builder customizeRequestConfig(Builder requestConfigBuilder) {
    			requestConfigBuilder.setConnectTimeout(connectTimeOut);
    			requestConfigBuilder.setSocketTimeout(socketTimeOut);
    			requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeOut);
    			return requestConfigBuilder;
    		}
    	});
    }
    /**
     * 异步httpclient的连接数配置
     */
    public void setMutiConnectConfig() {
    	builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
    		@Override
    		public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
    			httpClientBuilder.setMaxConnTotal(maxConnectNum);
    			httpClientBuilder.setMaxConnPerRoute(maxConnectPerRoute);
    			return httpClientBuilder;
    		}
    	});
    }
    

    若是需认证,客户端注入参考如下方法

    @Bean(autowire = Autowire.BY_NAME, name = "restHighLevelClientNew")
    public RestHighLevelClient newClient() {
    	try {
    		final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
    		credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(newESUserName,newESUserPassWord));
    		builder = RestClient.builder(httpHosts.toArray(new HttpHost[0]))
    		 .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
    			  @Override
    			  public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
    				  return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
    			  }
    		  });
    		String auth = Base64Util.decode((_esUserName + ":" + _esUserPassWord).getBytes());
    		builder.setDefaultHeaders(new BasicHeader[]{new BasicHeader("Authorization", "Basic " + auth)});
    
    		if (uniqueConnectTimeConfig) { setNewConnectTimeOutConfig(); }
    		if (uniqueConnectNumConfig) { setNewMutiConnectConfig(); }
    		newClient = new RestHighLevelClient(newBuilder);
    		return newClient;
    	} catch (NumberFormatException e) { }
    	return null;
    }
    

    参考学习:示例1示例2 BulkMulti-GetReindexUpdate by queryDelete by queryRethrottle 多记录操作-RestClient

  • 相关阅读:
    UESTC--1267
    HDU--1394
    rvm 安装后的补充工作:source $HOME/.profile
    FFmpeg 初级使用
    Vue 打包部署上线
    阿里云Centos7.6中部署nginx1.16+uwsgi2.0.18+Django2.0.4
    响应式网站设计---Bootstrap
    GitBook简单的使用
    VUE 参数共享问题
    Django之JWT理解及简单应用
  • 原文地址:https://www.cnblogs.com/wjcx-sqh/p/11294683.html
Copyright © 2011-2022 走看看