zoukankan      html  css  js  c++  java
  • elasticsearch使用BulkProcessor批量入库数据

    在解决es入库问题上,之前使用过rest方式,经过一段时间的测试发现千万级别的数据会存在10至上百条数据的丢失问题,

    在需要保证数据的准确性的场景下,rest方式并不能保证结果的准确性,因此采用了elasticsearch的BulkProcessor方式来进行数据入库,

    实际上采用es客户端不同,rest方式采用的是restClient,基于http协议,BulkProcessor使用的是TransportClient,基于Tcp协议。

    下面是在spring下具体的实现步骤:
    1 定义一个student类,并json序列化
    2 json的具体实现
     

    3 构造BulkProcessor

    * setBulkActions(1000):每添加1000个request,执行一次bulk操作
    * setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)):每达到5M的请求size时,执行一次bulk操作
    * setFlushInterval(TimeValue.timeValueSeconds(5)):每5s执行一次bulk操作
    * setConcurrentRequests(1):默认是1,表示积累bulk requests和发送bulk是异步的,其数值表示发送bulk的并发线程数,设置为0表示二者同步的
    *setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100),3)):当ES由于资源不足发生异常
    EsRejectedExecutionException重試策略:默认(50ms, 8),
    * 策略算法:start + 10 * ((int) Math.exp(0.8d * (currentlyConsumed)) - 1)

     
    package es;
     
    import org.elasticsearch.action.bulk.BackoffPolicy;
    import org.elasticsearch.action.bulk.BulkProcessor;
    import org.elasticsearch.action.bulk.BulkRequest;
    import org.elasticsearch.action.bulk.BulkResponse;
    import org.elasticsearch.client.Client;
    import org.elasticsearch.common.settings.Settings;
    import org.elasticsearch.common.transport.InetSocketTransportAddress;
    import org.elasticsearch.common.unit.ByteSizeUnit;
    import org.elasticsearch.common.unit.ByteSizeValue;
    import org.elasticsearch.common.unit.TimeValue;
    import org.elasticsearch.transport.client.PreBuiltTransportClient;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
     
    import java.net.InetAddress;
    import java.net.UnknownHostException;
     
    @Configuration
    public class ESConfiguration {
        public static final Logger logger = LoggerFactory.getLogger(ESConfiguration.class);
     
        @Bean
        public BulkProcessor bulkProcessor() throws UnknownHostException {
     
            Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build();
     
            Client client = new PreBuiltTransportClient(settings)
                    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("http://192.168.10.33"), Integer.parseInt("9300")));
     
            return BulkProcessor.builder(client, new BulkProcessor.Listener() {
                @Override
                public void beforeBulk(long l, BulkRequest bulkRequest) {
     
                }
     
                @Override
                public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
     
                }
     
                @Override
                public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
                    logger.error("{} data bulk failed,reason :{}", bulkRequest.numberOfActions(), throwable);
                }
     
            }).setBulkActions(1000)
                    .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
                    .setFlushInterval(TimeValue.timeValueSeconds(5))
                    .setConcurrentRequests(1)
                    .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                    .build();
        }
    }
    

      

    4. 入库代码实现

    package es;
     
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.elasticsearch.action.bulk.BulkProcessor;
    import org.elasticsearch.action.index.IndexRequest;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Repository;
     
     
     
    @Repository
    public class StudentInsertDao{
        private final Logger logger = LoggerFactory.getLogger(this.getClass());
     
        @Autowired
        private BulkProcessor bulkProcessor;
     
        private ObjectMapper objectMapper = new ObjectMapper();
     
        public void insert(Student student) {
            String type = student.getAge();
            String id = student.getName()+student.getAddr()+student.getAge();
            try {
                byte[] json = objectMapper.writeValueAsBytes(student);
     
                bulkProcessor.add(new IndexRequest("students", type, id).source(json));
     
     
            } catch (Exception e) {
                logger.error("bulkProcessor failed ,reason:{}",e);
            }
        }
    }
    

      

    5. 测试代码

    @RunWith(SpringJUnit4ClassRunner.class)
    @WebAppConfiguration
    @ContextConfiguration(locations = {"classpath:servlet-context.xml", "classpath:applicationContext.xml"})
    public class StudentInsertDaoTest {    
        @Autowired
        private StudentInsertDao insertDao;
        
        @Test
        public void insert() throws Exception {
            
            Student student = new Student();
            student.setAge(12);
            student.setAddr("SH");
            student.setName("Jack");
            
            insertDao.insert(student);      
        }
    }
    

      



    原文链接:https://blog.csdn.net/wslyk606/article/details/79413980

    Elasticsearch-BulkProcessor浅析:

    https://blog.csdn.net/baichoufei90/article/details/97117025

  • 相关阅读:
    Unity 3(一):简介与示例
    MongoDB以Windows Service运行
    动态SQL中变量赋值
    网站发布IIS后堆栈追踪无法获取出错的行号
    GridView Postback后出错Operation is not valid due to the current state of the object.
    Visual Studio 2010 SP1 在线安装后,找到缓存在本地的临时文件以便下次离线安装
    SQL Server 问题之 排序规则(collation)冲突
    IIS 问题集锦
    linux下安装mysql(ubuntu0.16.04.1)
    apt-get update 系列作用
  • 原文地址:https://www.cnblogs.com/Allen-rg/p/11650770.html
Copyright © 2011-2022 走看看