zoukankan      html  css  js  c++  java
  • Spark和Spring整合处理离线数据

    如果你比较熟悉JavaWeb应用开发,那么对Spring框架一定不陌生,并且JavaWeb通常是基于SSM搭起的架构,主要用Java语言开发。但是开发Spark程序,Scala语言往往必不可少。

    众所周知,Scala如同Java一样,都是运行在JVM上的,所以它具有很多Java语言的特性,同时作为函数式编程语言,又具有自己独特的特性,实际应用中除了要结合业务场景,还要对Scala语言的特性有深入了解。

    如果想像使用Java语言一样,使用Scala来利用Spring框架特性、并结合Spark来处理离线数据,应该怎么做呢?

    本篇文章,通过详细的示例代码,介绍上述场景的具体实现,大家如果有类似需求,可以根据实际情况做调整。

    1. 定义一个程序启动入口

    object Bootstrap {
      private val log = LoggerFactory.getLogger(Bootstrap.getClass)
    
      //指定配置文件如log4j的路径
      val ConfFileName = "conf"
      val ConfigurePath = new File("").getAbsolutePath.substring(0, if (new File("").getAbsolutePath.lastIndexOf("lib") == -1) 0
      else new File("").getAbsolutePath.lastIndexOf("lib")) + this.ConfFileName + File.separator
    
      //存放实现了StatsTask的离线程序处理的类
      private val TASK_MAP = Map("WordCount" -> classOf[WordCount])
    
      def main(args: Array[String]): Unit = {
        //传入一些参数,比如要运行的离线处理程序类名、处理哪些时间的数据
        if (args.length < 1) {
          log.warn("args 参数异常!!!" + args.toBuffer)
          System.exit(1)
        }
        init(args)
      }
    
      def init(args: Array[String]) {
        try {
          SpringUtils.init(Array[String]("applicationContext.xml"))
          initLog4j()
    
          val className = args(0)
          // 实例化离线处理类
          val task = SpringUtils.getBean(TASK_MAP(className))
    
          args.length match {
            case 3 =>
              // 处理一段时间的每天离线数据
              val dtStart = DateTimeFormat.forPattern("yyyy-MM-dd").parseDateTime(args(1))
              val dtEnd = DateTimeFormat.forPattern("yyyy-MM-dd").parseDateTime(args(2))
              val days = Days.daysBetween(dtStart, dtEnd).getDays + 1
              for (i <- 0 until days) {
                val etime = dtStart.plusDays(i).toString("yyyy-MM-dd")
                task.runTask(etime)
    
                log.info(s"JOB --> $className 已成功处理: $etime 的数据")
              }
    
            case 2 =>
              // 处理指定的某天离线数据
              val etime = DateTimeFormat.forPattern("yyyy-MM-dd").parseDateTime(args(1)).toString("yyyy-MM-dd")
              task.runTask(etime)
              log.info(s"JOB --> $className 已成功处理: $etime 的数据")
    
            case 1 =>
              // 处理前一天离线数据
              val etime = DateTime.now().minusDays(1).toString("yyyy-MM-dd")
              task.runTask(etime)
              log.info(s"JOB --> $className 已成功处理: $etime 的数据")
    
            case _ => println("执行失败 args参数:" + args.toBuffer)
          }
        } catch {
          case e: Exception =>
            println("执行失败 args参数:" + args.toBuffer)
            e.printStackTrace()
        }
    
        // 初始化log4j
        def initLog4j() {
          val fileName = ConfigurePath + "log4j.properties"
          if (new File(fileName).exists) {
            PropertyConfigurator.configure(fileName)
            log.info("日志log4j已经启动")
          }
        }
      }
    }
    

    2. 加载Spring配置文件工具类

    object SpringUtils {
      private var context: ClassPathXmlApplicationContext = _
    
      def getBean(name: String): Any = context.getBean(name)
    
      def getBean[T](name: String, classObj: Class[T]): T = context.getBean(name, classObj)
    
      def getBean[T](_class: Class[T]): T = context.getBean(_class)
    
      def init(springXml: Array[String]): Unit = {
        if (springXml == null || springXml.isEmpty) {
          try
            throw new Exception("springXml 不可为空")
          catch {
            case e: Exception => e.printStackTrace()
          }
        }
        context = new ClassPathXmlApplicationContext(springXml(0))
        context.start()
      }
    
    }
    

    3. Spring配置文件applicationContext.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
              http://www.springframework.org/schema/context  http://www.springframework.org/schema/context/spring-context-4.0.xsd">
    
        <!-- 配置包扫描 -->
        <context:component-scan base-package="com.bigdata.stats"/>
    
    </beans>
    

    4. 定义一个trait,作为离线程序的公共"父类"

    trait StatsTask extends Serializable {
      //"子类"继承StatsTask重写该方法实现自己的业务处理逻辑 
      def runTask(etime: String)
    }
    

    5. 继承StatsTask的离线处理类

    //不要忘记添加 @Component ,否则无法利用Spring对WordCount进行实例化
    @Component
    class WordCount extends StatsTask {
    
      override def runTask(etime: String): Unit = {
        val sparkSession = SparkSession
          .builder()
          .appName("test")
          .master("local[*]")
          .getOrCreate()
    
        import sparkSession.implicits._
    
        val words = sparkSession.read.textFile("/Users/BigData/Documents/data/wordcount.txt").flatMap(_.split(" "))
          .toDF("word")
    
        words.createOrReplaceTempView("wordcount")
    
        val df = sparkSession.sql("select word, count(*) count from wordcount group by word")
    
        df.show()
      }
    }
    

    推荐文章:

    Spark流式状态管理

    Spark实现推荐系统中的相似度算法

    Scala中的IO操作及ArrayBuffer线程安全问题

    学好Spark必须要掌握的Scala技术点


    关注微信公众号:大数据学习与分享,获取更对技术干货

  • 相关阅读:
    Thread中的join使用
    java.lang.NoClassDefFoundError: Ljavax/enterprise/inject/spi/BeanManager;
    org.hibernate.HibernateException: Could not parse configuration: /hibernate.cfg.xm
    maven 中使用jstl 错误解决
    eclipse 安装maven
    前端 JS事件操作
    前端 JS
    前端 固定位置 与绝对定位
    前端 显示与隐藏
    前端 盒子阴影
  • 原文地址:https://www.cnblogs.com/bigdatalearnshare/p/14259672.html
Copyright © 2011-2022 走看看