zoukankan      html  css  js  c++  java
  • ElasticSearch UpdateApi

    Api

    @Component
    public class UpdateApi {
    
        @Autowired
        private RestHighLevelClient client;
        @Autowired
        @Qualifier("updateListener")
        private ActionListener listener;
    
        //以JSON格式提供的部分文档源
        public void updateJSON(String index, String id, String jsonStrings){
            UpdateRequest request = new UpdateRequest("posts", "1");
    //        String jsonString = "{" +
    //                ""updated":"2017-01-01"," +
    //                ""reason":"daily update"" +
    //                "}";
            String jsonString = "{" +
                    ""user":"sdfdasfdsa"," +
                    ""postDate":"2013-01-30"," +
                    ""message":"fdsfdasfdsfsd1"" +
                    "}";
    
            request.doc(jsonString, XContentType.JSON);
            try {
                UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
            }catch (IOException e){
    
            }catch (ElasticsearchException e) {
                if (e.status() == RestStatus.NOT_FOUND) {
                    //处理因文档不存在而引发的异常
                }
            }
            client.updateAsync(request, RequestOptions.DEFAULT, listener);
        }
    
        //以映射形式提供的部分文档源,自动转换为JSON格式
        public void updateMap(){
            Map<String, Object> jsonMap = new HashMap<>();
            jsonMap.put("updated", new Date());
            jsonMap.put("reason", "daily update");
            UpdateRequest request = new UpdateRequest("posts", "doc", "1").doc(jsonMap);
    
            //如果文档不存在,可以使用upsert方法定义一些内容,这些内容将作为新文档插入
            String jsonString = "{"created":"2017-01-01"}";
            request.upsert(jsonString, XContentType.JSON);
            request.routing("routing");
            request.timeout(TimeValue.timeValueSeconds(1));
    //        request.timeout("1s");
            request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
    //        request.setRefreshPolicy("wait_for");
            //如果要更新的文档在更新操作的get和索引阶段之间被另一个操作更改,那么要重试多少次更新操作
            request.retryOnConflict(3);
            request.fetchSource(true);
            request.version(2);
            String[] includes = new String[]{"updated", "r*"};
    //        String[] includes = Strings.EMPTY_ARRAY;
    //        String[] excludes = new String[]{"updated"};
            String[] excludes = Strings.EMPTY_ARRAY;
            request.fetchSource(new FetchSourceContext(true, includes, excludes));
            //禁用noop检测
            request.detectNoop(false);
            //表明无论文档是否存在,脚本都必须运行,即如果文档不存在,脚本将负责创建文档
            request.scriptedUpsert(false);
            //设置在继续执行更新操作之前必须处于活动状态的碎片副本的数量
            request.waitForActiveShards(2);
            //作为ActiveShardCount提供的碎片副本数量:可以是ActiveShardCount。ActiveShardCount。一个或ActiveShardCount。默认(默认)
            request.waitForActiveShards(ActiveShardCount.ALL);
    
            try {
                UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
            }catch (IOException e){
    
            }catch (ElasticsearchException e) {
                if (e.status() == RestStatus.NOT_FOUND) {
                    //处理因文档不存在而引发的异常
                }
            }
            client.updateAsync(request, RequestOptions.DEFAULT, listener);
        }
    
        //部分文档源作为一个XContentBuilder对象提供,Elasticsearch内置帮助生成JSON内容
        public void updateBuilder(){
            try {
                XContentBuilder builder = XContentFactory.jsonBuilder();
                builder.startObject();
                {
                    builder.timeField("updated", new Date());
                    builder.field("reason", "daily update");
                }
                builder.endObject();
    
                UpdateRequest request = new UpdateRequest("posts", "1").doc(builder);
            }catch (IOException e){
    
            }
        }
    
        //作为对象键对提供的部分文档源,它被转换为JSON格式
        public void update(){
            UpdateRequest request = new UpdateRequest("posts", "1")
                    .doc("updated", new Date(), "reason", "daily update");
        }
    
        public void updateWithScript(String index, String type, String id){
            UpdateRequest request = new UpdateRequest("posts", "1");
    
            //作为Map对象提供的脚本参数
            Map<String, Object> parameters = singletonMap("count", 4);
            //使用painless语言和先前的参数创建内联脚本
            Script inline = new Script(ScriptType.INLINE, "painless", "ctx._source.field += params.count", parameters);
            //将脚本设置为更新请求
            request.script(inline);
        }
    }

    Listener

    package com.wjc.ccf.elasticsearch.listener;
    
    import org.elasticsearch.action.ActionListener;
    import org.elasticsearch.action.DocWriteResponse;
    import org.elasticsearch.action.support.replication.ReplicationResponse;
    import org.elasticsearch.action.update.UpdateResponse;
    import org.elasticsearch.index.get.GetResult;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.Map;
    
    /**
     * @author wjc
     * @description
     * @date 2020/5/10
     */
    @Configuration
    public class ESUpdateListener {
        @Bean("updateListener")
        public ActionListener listener(){
            ActionListener listener = new ActionListener<UpdateResponse>() {
                @Override
                public void onResponse(UpdateResponse updateResponse) {
                    String index = updateResponse.getIndex();
                    String type = updateResponse.getType();
                    String id = updateResponse.getId();
                    long version = updateResponse.getVersion();
    
                    if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
                        //处理第一次创建文档的情况(upsert)
                    } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
                        //处理文档更新的情况
                    } else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
                        //处理删除文档的情况
                    } else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
                        //处理文档不受更新影响的情况,即对文档不执行任何操作(noop)
                    }
    
                    //当通过fetchSource方法在UpdateRequest中启用源检索时,响应包含已更新文档的源
                    //以GetResult的形式检索更新后的文档
                    GetResult result = updateResponse.getGetResult();
                    if (result.isExists()) {
                        //以字符串的形式检索已更新文档的源
                        String sourceAsString = result.sourceAsString();
                        //以Map<String, Object>的形式检索更新后的文档的源
                        Map<String, Object> sourceAsMap = result.sourceAsMap();
                        //以字节[]的形式检索已更新文档的源
                        byte[] sourceAsBytes = result.source();
                    } else {
                        //处理响应中不存在文档源的场景(默认情况下是这种情况)
                    }
    
                    //还可以检查碎片故障
                    ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
                    if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
                        //处理成功分片的数量少于总分片的情况
                    }
                    if (shardInfo.getFailed() > 0) {
                        for (ReplicationResponse.ShardInfo.Failure failure :
                                shardInfo.getFailures()) {
                            //处理潜在的故障
                            String reason = failure.reason();
                        }
                    }
                }
    
                @Override
                public void onFailure(Exception e) {
    
                }
            };
            return listener;
        }
    }

  • 相关阅读:
    第三方登录(QQ登录)开发流程详解
    编译PHP并与Ngnix整合
    Ngnix的日志管理和用定时任务完成日志切割
    Ngnix 安装、信号量、虚拟主机配置
    Redis命令操作详解
    Redis的安装和部署
    消息队列
    Ubuntu中Google Chrome安装
    关于双系统下Ubuntu不能访问Windows中某个盘的问题
    numpy.random.shuffle()与numpy.random.permutation()的区别
  • 原文地址:https://www.cnblogs.com/gqymy/p/12891394.html
Copyright © 2011-2022 走看看