zoukankan      html  css  js  c++  java
  • Elasticsearch.Net、Nest批量插入BulkAll

    demo地址:BulkAll

    批量导入

    实现目标:想要使用ElasticSearch的 .Net Api客户端NEST批量导入数据,并发异步高效的批量导入
    NEST提供了BulkAll
    不废话,上代码

                const int size = 1000;
                var tokenSource = new CancellationTokenSource();
    
                var observableBulk = elasticClient.BulkAll(list, f => f
                        .MaxDegreeOfParallelism(8)
                        .BackOffTime(TimeSpan.FromSeconds(10))
                        .BackOffRetries(2)
                        .Size(size)
                        .RefreshOnCompleted()
                        .Index(indexName)
                        .BufferToBulk((r, buffer) => r.IndexMany(buffer))
                    , tokenSource.Token);
    
                var countdownEvent = new CountdownEvent(1);
    
                Exception exception = null;
    
                var bulkAllObserver = new BulkAllObserver();
    
                observableBulk.Subscribe(bulkAllObserver);
    
                countdownEvent.Wait(tokenSource.Token);
    

    如果想要对处理导入过程进行监控可以这么替换BulkAllObserver

                   var bulkAllObserver = new BulkAllObserver(
                    onNext: response =>
                    {
                        WriteLine($"Indexed {response.Page * size} with {response.Retries} retries");
                    },
                    onError: ex =>
                    {
                        WriteLine("BulkAll Error : {0}", ex);
                        exception = ex;
                        countdownEvent.Signal();
                    },
                    () =>
                    {
                        WriteLine("BulkAll Finished");
                        countdownEvent.Signal();
                    });
    

    还可以使用C#的local function特性,如下所示

                void OnCompleted()
                {
                    WriteLine("BulkAll Finished");
                    countdownEvent.Signal();
                }
    
                var bulkAllObserver = new BulkAllObserver(
                    onNext: response =>
                    {
                        WriteLine($"Indexed {response.Page * size} with {response.Retries} retries");
                    },
                    onError: ex =>
                    {
                        WriteLine("BulkAll Error : {0}", ex);
                        exception = ex;
                        countdownEvent.Signal();
                    },
                    OnCompleted);
    

    完成demo,请点击 BulkAll 查看

  • 相关阅读:
    王建军_百度百科
    腾讯研究院关于研究院
    创业公司3Gear Systems利用Kinect打造未来人机交互体验 | 36氪
    KVM切换器_互动百科
    保荐人考试
    Engadget 中文版征人启事 《 they're hiring
    吸血僵尸惊情四百年
    小霸王手机
    ARM、高通、德州仪器这三家芯片企业该怎么区分和评价?
    《美丽心灵》兼谈纳什均衡理论
  • 原文地址:https://www.cnblogs.com/AlienXu/p/10775923.html
Copyright © 2011-2022 走看看