zoukankan      html  css  js  c++  java
  • scala_spark实践1

    /**
      *  scala模型的main(args:Array[String])是业务执行入口
      *  org.apache.spark.{SparkConf, SparkContext}
      *  val sparkConf =new SparkConf().setAppName(appName)
      *  val ssc = new StreamingContext(sparkConf, Seconds(batchNum))
      *  val sc = ssc.sparkContext  //如果代码中不用StreamingContextval 只需要SparkContext则new一个如val sc = new SparkContext(sparkConf)
      *
      *  val sqlContext = new HiveContext(sc)//HiveContext是对SQLContext的扩展 val sqlContext = new SQLContext(sc)
      *  val result:DataFrame = sqlContext.sql(sql)
      *  //2.0之后HiveContext和SQLContext也可以用SparkSession替换val result =SparkSession.builder().appName("test").config("key","value").getOrCreate().sql(sql)
      *
      *  项目中一般用json处理,如发送kafka或者格式转换和过滤
      *   val resultRdd = result.toJSON.rdd.map(x => {
              val json = new JSONObject(x)
              val computerIp = json.optString("ip", "")
              val rowKey = json.optString("name", "")
              ......
              val dataMap = new util.HashMap[String, String]()
              dataMap.put("computerip", computerIp)
              (rowKey, dataMap)
          })
       val bhaseRdd = resultRdd.filter(r => {
       r._1 != "" && r._1 != null && r._1.length > 0
       }).map(line => {
       val put = new Put(Bytes.toBytes(line._1)) //rowKey 为参数,拿到put
       val key = line._2.keySet().iterator(); //拿到对应的dataMap
       while (key.hasNext) {
        val k = key.next().toString
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(k), Bytes.toBytes(line._2.get(k)))
       }
        (new ImmutableBytesWritable(), put)
       })
    
       val hadoopconf = sc.hadoopConfiguration
       val jobconf = new JobConf(hadoopconf)
       jobconf.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
       jobconf.setOutputValueClass(classOf[Result])
       jobconf.setClass("mapreduce.job.outputformat.class", classOf[TableOutputFormat[ImmutableBytesWritable]],classOf[OutputFormat[ImmutableBytesWritable, Mutation]])
       jobconf.set(TableOutputFormat.OUTPUT_TABLE, table)
    
       bhaseRdd.saveAsNewAPIHadoopDataset(jobconf) //存入Hasee
      *-----------------------------------------------------------------------------------------------------------
      * class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {
        lazy val producer = createProducer()
        def send(topic: String, value: String): Unit ={
    
          producer.send(new ProducerRecord(topic, value))
        }
        }
    
      object KafkaSink {
        def apply(config: java.util.Map[String, Object]): KafkaSink = {
          val f = () => {
            val producer = new KafkaProducer[String, String](config)
            producer
          }
          new KafkaSink(f)
        }
      }
      *val kafka = sc.broadcast(KafkaSink(Configs.kafka_props))
      *selectDatas.toJSON.rdd.foreach(x => {
          val json = new JSONObject(x)
          kafka.value.send(topic, json.toString)
      })
      *//发送topic
      *-------------------------------------------------------------------
    * val kafkaStream= KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kafka_param,topic,StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
    * kafkaStream.foreachRDD(rdd =>{
    *   rdd.foreach(data=> {
    * //消费kafka
    */
    

      

    
    
  • 相关阅读:
    Read-Copy Update Implementation For Non-Cache-Coherent Systems
    10 华电内部文档搜索系统 search04
    10 华电内部文档搜索系统 search05
    lucene4
    10 华电内部文档搜索系统 search01
    01 lucene基础 北风网项目培训 Lucene实践课程 索引
    01 lucene基础 北风网项目培训 Lucene实践课程 系统架构
    01 lucene基础 北风网项目培训 Lucene实践课程 Lucene概述
    第五章 大数据平台与技术 第13讲 NoSQL数据库
    第五章 大数据平台与技术 第12讲 大数据处理平台Spark
  • 原文地址:https://www.cnblogs.com/shaozhiqi/p/12171904.html
Copyright © 2011-2022 走看看