工具类
config.properties
# jbdc配置
jdbc.datasource.size=10
jdbc.url=jdbc:mysql://hadoop101:3306/database?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true
jdbc.user=root
jdbc.password=000000
# Kafka
kafka.broker.list=hadoop101:9092,hadoop102:9092,hadoop103:9092
# Redis配置
redis.host=hadoop101
redis.port=6379
# hive 的数据库名(选配)
hive.database=database
Properties.Util
import java.io.InputStreamReader
import java.util.Properties
object PropertiesUtil {
def load(propertieName: String): Properties = {
val prop = new Properties();
prop.load(new InputStreamReader(Thread.currentThread().getContextClassLoader.getResourceAsStream(propertieName),
"UTF-8"))
prop
}
}
MyJdbcUtil
import com.alibaba.druid.pool.DruidDataSourceFactory
import java.sql.PreparedStatement
import java.util.Properties
import javax.sql.DataSource
object JdbcUtil {
var dataSource: DataSource = init()
def init() = {
val properties = new Properties()
val prop = PropertiesUtil.load("config.properties")
properties.setProperty("driverClassName", "com.mysql.jdbc.Driver")
properties.setProperty("url", prop.getProperty("jdbc.url"))
properties.setProperty("username", prop.getProperty("jdbc.user"))
properties.setProperty("password", prop.getProperty("jdbc.password"))
properties.setProperty("maxActive", prop.getProperty("jdbc.datasource.size"))
DruidDataSourceFactory.createDataSource(properties)
}
def executeUpdate(sql: String, params: Array[Any]): Int = { // "insert into xxx values (?,?,?)"
var rtn = 0
var pstmt: PreparedStatement = null
val connection = dataSource.getConnection
try {
connection.setAutoCommit(false)
pstmt = connection.prepareStatement(sql)
if (params != null && params.length > 0) {
for (i <- 0 until params.length) {
pstmt.setObject(i + 1, params(i))
}
}
rtn = pstmt.executeUpdate()
connection.commit()
} catch {
case e: Exception => e.printStackTrace
}
rtn
}
def executeBatchUpdate(sql: String, paramsList: Iterable[Array[Any]]): Array[Int] = {
var rtn: Array[Int] = null
var pstmt: PreparedStatement = null
val connection = dataSource.getConnection
try {
connection.setAutoCommit(false)
pstmt = connection.prepareStatement(sql)
for (params <- paramsList) {
if (params != null && params.length > 0) {
for (i <- 0 until params.length) {
pstmt.setObject(i + 1, params(i))
}
pstmt.addBatch()
}
}
rtn = pstmt.executeBatch()
connection.commit()
} catch {
case e: Exception => e.printStackTrace
}
rtn
}
// 测试
def main(args: Array[String]): Unit = {
// JdbcUtil.executeUpdate("insert into table_1 values(?,?,?,?,?)", Array("take100", "100", 100, 200,300))
JdbcUtil.executeBatchUpdate("insert into table_1 values(?,?,?,?,?)",List(Array("take101", "100", 200, 200,200),Array("take102", "100", 300, 300,300)))
}
}
MyRedisUtil
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
object MyRedisUtil {
var jedisPool: JedisPool = null
def getJedisClient: Jedis = {
if (jedisPool == null) {
println("开辟一个连接池")
val prop = PropertiesUtil.load("config.properties")
val host = prop.getProperty("redis.host")
val port = prop.getProperty("redis.port").toInt
val jedisPoolConfig = new JedisPoolConfig()
jedisPoolConfig.setMaxTotal(100) //最大连接数
jedisPoolConfig.setMaxIdle(20) //最大空闲
jedisPoolConfig.setMinIdle(20) //最小空闲
jedisPoolConfig.setBlockWhenExhausted(true) //忙碌时是否等待
jedisPoolConfig.setMaxWaitMillis(500)//忙碌时等待时长 毫秒
jedisPoolConfig.setTestOnBorrow(true) //每次获得连接的进行测试
jedisPool = new JedisPool(jedisPoolConfig, host, port)
}
println(s"jedisPool.getNumActive = ${jedisPool.getNumActive}")
println("获得一个连接")
jedisPool.getResource
}
}
MyKafkaUitl
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
object MyKafkaUtil {
private val properties: Properties = PropertiesUtil.load("config.properties")
val broker_list = properties.getProperty("kafka.broker.list")
// kafka消费者配置
val kafkaParam = Map(
"bootstrap.servers" -> broker_list,//用于初始化链接到集群的地址
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
//用于标识这个消费者属于哪个消费团体
"group.id" -> "gmall_consumer_group",
//如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
//可以使用这个配置,latest自动重置偏移量为最新的偏移量
"auto.offset.reset" -> "latest",
//如果是true,则这个消费者的偏移量会在后台自动提交,但是kafka宕机容易丢失数据
//如果是false,会需要手动维护kafka偏移量
"enable.auto.commit" -> (true: java.lang.Boolean)
)
// 创建DStream,返回接收到的输入数据
// LocationStrategies:根据给定的主题和集群地址创建consumer
// LocationStrategies.PreferConsistent:持续的在所有Executor之间分配分区
// ConsumerStrategies:选择如何在Driver和Executor上创建和配置Kafka Consumer
// ConsumerStrategies.Subscribe:订阅一系列主题
def getKafkaStream(topic: String, ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {
val dStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParam))
dStream
}
}
MyEsUtil
import io.searchbox.client.config.HttpClientConfig
import io.searchbox.client.{JestClient, JestClientFactory}
import io.searchbox.core.{Bulk, BulkResult, Index}
import java.util.Objects
object MyEsUtil {
private val ES_HOST = "http://hadoop101"
private val ES_HTTP_PORT = 9200
private var factory: JestClientFactory = null
/**
* 获取客户端
*
* @return jestclient
*/
def getClient: JestClient = {
if (factory == null) build()
factory.getObject
}
/**
* 关闭客户端
*/
def close(client: JestClient): Unit = {
if (!Objects.isNull(client)) try
client.shutdownClient()
catch {
case e: Exception =>
e.printStackTrace()
}
}
/**
* 建立连接
*/
private def build(): Unit = {
factory = new JestClientFactory
factory.setHttpClientConfig(new HttpClientConfig.Builder(ES_HOST + ":" + ES_HTTP_PORT).multiThreaded(true)
.maxTotalConnection(20) //连接总数
.connTimeout(10000).readTimeout(10000).build)
}
// 批量插入
def insertBulk(indexName: String, docList: List[Any]): Unit = {
val jest: JestClient = getClient
val bulkBuilder = new Bulk.Builder
bulkBuilder.defaultIndex(indexName).defaultType("_ex")
println(docList.mkString("
"))
for (doc <- docList) {
val index: Index = new Index.Builder(doc).build()
bulkBuilder.addAction(index)
}
val result: BulkResult = jest.execute(bulkBuilder.build())
println(s"保存es= ${result.getItems.size()} 条")
close(jest)
}
// 测试
def main(args: Array[String]): Unit = {
val jest: JestClient = getClient
val doc = "{
"name":"yiyi",
"age": 17
}"
val index: Index = new Index.Builder(doc).index("myesutil_test").`type`("_doc").build()
jest.execute(index)
}
}
---------------------
原文:https://blog.csdn.net/qq_31108141/article/details/88367058