zoukankan      html  css  js  c++  java
  • Idea 远程往 ElasticSearch 写入数据

    添加依赖

    <!--es 相关依赖开始-->
    <dependency>
        <groupId>io.searchbox</groupId>
        <artifactId>jest</artifactId>
        <version>6.3.1</version>
    </dependency>
    
    <dependency>
        <groupId>net.java.dev.jna</groupId>
        <artifactId>jna</artifactId>
        <version>4.5.2</version>
    </dependency>
    
    <dependency>
        <groupId>org.codehaus.janino</groupId>
        <artifactId>commons-compiler</artifactId>
        <version>2.7.8</version>
    </dependency>
    <!-- es 相关依赖结束 -->

    代码示例

    import com.atguigu.gmall.realtime.bean.AlertInfo
    import io.searchbox.client.config.HttpClientConfig
    import io.searchbox.client.{JestClient, JestClientFactory}
    import io.searchbox.core.{Bulk, Index}
    import org.apache.spark.rdd.RDD
    
    /**
     * Author atguigu
     * Date 2020/6/3 13:58
     */
    object ESUtil {
        
        val factory = new JestClientFactory
        // 1.1.1 给工厂设置es的相关参数
        val esUrl = "http://hadoop102:8300" //注意换成自己的端口(9200)
        val config = new HttpClientConfig.Builder(esUrl)
            .maxTotalConnection(100) // 允许的最多客户端的个数
            .connTimeout(10000) // 连接es的超时时间
            .readTimeout(10000) // 读取数据的超时时间
            .multiThreaded(true)
            .build()
        factory.setHttpClientConfig(config)
        
        /**
         * 向es中插入单条数据
         *
         * @param index
         * @param source
         * @param id
         */
        def insertSingle(index: String, source: Object, id: String = null): Unit = {
            val client: JestClient = factory.getObject
            val action = new Index.Builder(source)
                .index(index)
                .`type`("_doc")
                .id(id) // 如果是传递的null, 则相当于没有传
                .build()
            client.execute(action)
            client.shutdownClient() // 把客户端还给工厂
        }
        
        /**
         * 批量插入
         *
         * @param index
         * @param sources
         */
        def insertBulk(index: String, sources: Iterator[Object]) = {
            val client: JestClient = factory.getObject
            val builder = new Bulk.Builder()
                .defaultIndex(index)
                .defaultType("_doc")
            // 在一个Bulk.Builder中add进去多个Action, 可以一次性交给es完成插入
            // Object   (id, object)
            sources.foreach {
                case (id: String, data) =>
                    val action = new Index.Builder(data)
                        .id(id)
                        .build()
                    builder.addAction(action)
                case data =>
                    val action = new Index.Builder(data)
                        .build()
                    builder.addAction(action)
            }
            
            client.execute(builder.build())
            client.shutdownClient()
        }
    }
  • 相关阅读:
    JVM指令
    spring源码分析之配置文件名占位符的解析(一)
    freemarker
    spring整合freemarker
    策略模式
    spring boot 学习笔记(一)之前端文件配置
    线程使用总结
    maven pom 配置 学习笔记(二)之搭建nexus私服
    删除数据库中所有存在表的数据
    自定义SWT控件七之自定义Shell(可伸缩窗口)
  • 原文地址:https://www.cnblogs.com/yangxusun9/p/13040463.html
Copyright © 2011-2022 走看看