添加依赖
<!--es 相关依赖开始-->
<dependency>
<groupId>io.searchbox</groupId>
<artifactId>jest</artifactId>
<version>6.3.1</version>
</dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
<version>2.7.8</version>
</dependency>
<!-- es 相关依赖结束 -->
代码示例
import com.atguigu.gmall.realtime.bean.AlertInfo
import io.searchbox.client.config.HttpClientConfig
import io.searchbox.client.{JestClient, JestClientFactory}
import io.searchbox.core.{Bulk, Index}
import org.apache.spark.rdd.RDD
/**
* Author atguigu
* Date 2020/6/3 13:58
*/
object ESUtil {
val factory = new JestClientFactory
// 1.1.1 给工厂设置es的相关参数
val esUrl = "http://hadoop102:8300" //注意换成自己的端口(9200)
val config = new HttpClientConfig.Builder(esUrl)
.maxTotalConnection(100) // 允许的最多客户端的个数
.connTimeout(10000) // 连接es的超时时间
.readTimeout(10000) // 读取数据的超时时间
.multiThreaded(true)
.build()
factory.setHttpClientConfig(config)
/**
* 向es中插入单条数据
*
* @param index
* @param source
* @param id
*/
def insertSingle(index: String, source: Object, id: String = null): Unit = {
val client: JestClient = factory.getObject
val action = new Index.Builder(source)
.index(index)
.`type`("_doc")
.id(id) // 如果是传递的null, 则相当于没有传
.build()
client.execute(action)
client.shutdownClient() // 把客户端还给工厂
}
/**
* 批量插入
*
* @param index
* @param sources
*/
def insertBulk(index: String, sources: Iterator[Object]) = {
val client: JestClient = factory.getObject
val builder = new Bulk.Builder()
.defaultIndex(index)
.defaultType("_doc")
// 在一个Bulk.Builder中add进去多个Action, 可以一次性交给es完成插入
// Object (id, object)
sources.foreach {
case (id: String, data) =>
val action = new Index.Builder(data)
.id(id)
.build()
builder.addAction(action)
case data =>
val action = new Index.Builder(data)
.build()
builder.addAction(action)
}
client.execute(builder.build())
client.shutdownClient()
}
}