zoukankan      html  css  js  c++  java
  • 大数据常用工具类

    工具类
    config.properties
    # jbdc配置
    jdbc.datasource.size=10
    jdbc.url=jdbc:mysql://hadoop101:3306/database?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true
    jdbc.user=root
    jdbc.password=000000

    # Kafka
    kafka.broker.list=hadoop101:9092,hadoop102:9092,hadoop103:9092

    # Redis配置
    redis.host=hadoop101
    redis.port=6379

    # hive 的数据库名(选配)
    hive.database=database
    Properties.Util
    import java.io.InputStreamReader
    import java.util.Properties

    object PropertiesUtil {

       def load(propertieName: String): Properties = {
           val prop = new Properties();
           prop.load(new InputStreamReader(Thread.currentThread().getContextClassLoader.getResourceAsStream(propertieName),
               "UTF-8"))
           prop
      }

    }
    MyJdbcUtil
    import com.alibaba.druid.pool.DruidDataSourceFactory
    import java.sql.PreparedStatement
    import java.util.Properties
    import javax.sql.DataSource

    object JdbcUtil {

       var dataSource: DataSource = init()

       def init() = {
           val properties = new Properties()
           val prop = PropertiesUtil.load("config.properties")

           properties.setProperty("driverClassName", "com.mysql.jdbc.Driver")
           properties.setProperty("url", prop.getProperty("jdbc.url"))
           properties.setProperty("username", prop.getProperty("jdbc.user"))
           properties.setProperty("password", prop.getProperty("jdbc.password"))
           properties.setProperty("maxActive", prop.getProperty("jdbc.datasource.size"))

           DruidDataSourceFactory.createDataSource(properties)

      }

       def executeUpdate(sql: String, params: Array[Any]): Int = { // "insert into xxx values (?,?,?)"
           var rtn = 0
           var pstmt: PreparedStatement = null
           val connection = dataSource.getConnection
           try {
               connection.setAutoCommit(false)
               pstmt = connection.prepareStatement(sql)

               if (params != null && params.length > 0) {
                   for (i <- 0 until params.length) {
                       pstmt.setObject(i + 1, params(i))
                  }
              }
               rtn = pstmt.executeUpdate()
               connection.commit()
          } catch {
               case e: Exception => e.printStackTrace
          }
           rtn
      }

       def executeBatchUpdate(sql: String, paramsList: Iterable[Array[Any]]): Array[Int] = {
           var rtn: Array[Int] = null
           var pstmt: PreparedStatement = null
           val connection = dataSource.getConnection
           try {
               connection.setAutoCommit(false)
               pstmt = connection.prepareStatement(sql)
               for (params <- paramsList) {
                   if (params != null && params.length > 0) {
                       for (i <- 0 until params.length) {
                           pstmt.setObject(i + 1, params(i))
                      }
                       pstmt.addBatch()
                  }
              }
               rtn = pstmt.executeBatch()
               connection.commit()
          } catch {
               case e: Exception => e.printStackTrace
          }
           rtn
      }

       // 测试
       def main(args: Array[String]): Unit = {
    //       JdbcUtil.executeUpdate("insert into table_1 values(?,?,?,?,?)", Array("take100", "100", 100, 200,300))
           JdbcUtil.executeBatchUpdate("insert into table_1 values(?,?,?,?,?)",List(Array("take101", "100", 200, 200,200),Array("take102", "100", 300, 300,300)))
      }
    }
    MyRedisUtil
    import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}

    object MyRedisUtil {

       var jedisPool: JedisPool = null

       def getJedisClient: Jedis = {
           if (jedisPool == null) {
               println("开辟一个连接池")
               val prop = PropertiesUtil.load("config.properties")
               val host = prop.getProperty("redis.host")
               val port = prop.getProperty("redis.port").toInt

               val jedisPoolConfig = new JedisPoolConfig()
               jedisPoolConfig.setMaxTotal(100)  //最大连接数
               jedisPoolConfig.setMaxIdle(20)   //最大空闲
               jedisPoolConfig.setMinIdle(20)     //最小空闲
               jedisPoolConfig.setBlockWhenExhausted(true)  //忙碌时是否等待
               jedisPoolConfig.setMaxWaitMillis(500)//忙碌时等待时长 毫秒
               jedisPoolConfig.setTestOnBorrow(true) //每次获得连接的进行测试

               jedisPool = new JedisPool(jedisPoolConfig, host, port)
          }
           println(s"jedisPool.getNumActive = ${jedisPool.getNumActive}")
           println("获得一个连接")
           jedisPool.getResource
      }
       
    }
    MyKafkaUitl
    import java.util.Properties
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

    object MyKafkaUtil {

       private val properties: Properties = PropertiesUtil.load("config.properties")
       val broker_list = properties.getProperty("kafka.broker.list")

       // kafka消费者配置
       val kafkaParam = Map(
           "bootstrap.servers" -> broker_list,//用于初始化链接到集群的地址
           "key.deserializer" -> classOf[StringDeserializer],
           "value.deserializer" -> classOf[StringDeserializer],
           //用于标识这个消费者属于哪个消费团体
           "group.id" -> "gmall_consumer_group",
           //如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
           //可以使用这个配置,latest自动重置偏移量为最新的偏移量
           "auto.offset.reset" -> "latest",
           //如果是true,则这个消费者的偏移量会在后台自动提交,但是kafka宕机容易丢失数据
           //如果是false,会需要手动维护kafka偏移量
           "enable.auto.commit" -> (true: java.lang.Boolean)
      )

       // 创建DStream,返回接收到的输入数据
       // LocationStrategies:根据给定的主题和集群地址创建consumer
       // LocationStrategies.PreferConsistent:持续的在所有Executor之间分配分区
       // ConsumerStrategies:选择如何在Driver和Executor上创建和配置Kafka Consumer
       // ConsumerStrategies.Subscribe:订阅一系列主题
       def getKafkaStream(topic: String, ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {
           val dStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParam))
           dStream
      }
    }
    MyEsUtil
    import io.searchbox.client.config.HttpClientConfig
    import io.searchbox.client.{JestClient, JestClientFactory}
    import io.searchbox.core.{Bulk, BulkResult, Index}
    import java.util.Objects

    object MyEsUtil {
       private val ES_HOST = "http://hadoop101"
       private val ES_HTTP_PORT = 9200
       private var factory: JestClientFactory = null

       /**
         * 获取客户端
         *
         * @return jestclient
         */
       def getClient: JestClient = {
           if (factory == null) build()
           factory.getObject
      }

       /**
         * 关闭客户端
         */
       def close(client: JestClient): Unit = {
           if (!Objects.isNull(client)) try
               client.shutdownClient()
           catch {
               case e: Exception =>
                   e.printStackTrace()
          }
      }

       /**
         * 建立连接
         */
       private def build(): Unit = {
           factory = new JestClientFactory
           factory.setHttpClientConfig(new HttpClientConfig.Builder(ES_HOST + ":" + ES_HTTP_PORT).multiThreaded(true)
              .maxTotalConnection(20) //连接总数
              .connTimeout(10000).readTimeout(10000).build)

      }

       // 批量插入
       def insertBulk(indexName: String, docList: List[Any]): Unit = {
           val jest: JestClient = getClient
           val bulkBuilder = new Bulk.Builder
           bulkBuilder.defaultIndex(indexName).defaultType("_ex")
           println(docList.mkString(" "))
           for (doc <- docList) {

               val index: Index = new Index.Builder(doc).build()
               bulkBuilder.addAction(index)
          }
           val result: BulkResult = jest.execute(bulkBuilder.build())
           println(s"保存es= ${result.getItems.size()} 条")
           close(jest)
      }

       // 测试
       def main(args: Array[String]): Unit = {
           val jest: JestClient = getClient
           val doc = "{ "name":"yiyi", "age": 17 }"
           val index: Index = new Index.Builder(doc).index("myesutil_test").`type`("_doc").build()
           jest.execute(index)
      }

    }
     
    ---------------------
    原文:https://blog.csdn.net/qq_31108141/article/details/88367058

  • 相关阅读:
    SysEmailBatch 邮件
    控制数据源中某一列是否允许编辑 FormDataObject allowEdit
    设置表格字段背景色displayOption
    使用ExcelIo类操作读取excel文件
    Edit方法
    Box class
    Expressions in query ranges
    Set Class
    Map Class
    FTP from Axapta
  • 原文地址:https://www.cnblogs.com/Bkxk/p/10563723.html
Copyright © 2011-2022 走看看