zoukankan      html  css  js  c++  java
  • 分享一下spark streaming与flume集成的scala代码。

    文章来自:http://www.cnblogs.com/hark0623/p/4172462.html   转发请注明

    object LogicHandle { def main(args: Array[String]) {
    //添加这个不会报执行错误 val path = new File(".").getCanonicalPath() System.getProperties().put("hadoop.home.dir", path); new File("./bin").mkdirs(); new File("./bin/winutils.exe").createNewFile(); //val sparkConf = new SparkConf().setAppName("SensorRealTime").setMaster("local[2]") val sparkConf = new SparkConf().setAppName("SensorRealTime") val ssc = new StreamingContext(sparkConf, Seconds(20)) val hostname = "localhost" val port = 2345 val storageLevel = StorageLevel.MEMORY_ONLY val flumeStream = FlumeUtils.createStream(ssc, hostname, port, storageLevel) val lhc = new LogicHandleClass(); //日志格式化模板 val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); val sdfHour = new SimpleDateFormat("HH"); val sdfMinute = new SimpleDateFormat("mm") //存储数据的hash对象 key/value存储 根据文档规则,使用各统计指标的key/value var redisMap = new HashMap[String, String]
      
         flumeStream.foreachRDD(rdd
    => { val events = rdd.collect() //println("event count:" + events.length) var i = 1 for (event <- events) { val sensorInfo = new String(event.event.getBody.array()) //单行记录 //单行记录格式化 val arrayFileds = sensorInfo.split(",") if (arrayFileds.length == 6) { val shopId = arrayFileds(0) //店内编号 val floorId = shopId.substring(0, 5) //楼层编号 val mac = arrayFileds(1) val ts = arrayFileds(2).toLong //时间戳 val time = sdf.format(ts * 1000) var hour = sdfHour.format(ts * 1000) var minute = sdfMinute.format(ts * 1000) var allMinute = hour.toInt * 60 + minute.toInt val x = arrayFileds(3) val y = arrayFileds(4) val level = arrayFileds(5) //后边就是我的业务代码了,省略了 } } //存储至redis中 lhc.SetAll(redisMap) }) ssc.start() ssc.awaitTermination() } }
  • 相关阅读:
    使用jQuery和CSS自定义HTML5 Video 控件 简单适用
    在win7系统下使用Windows XP Mode 和 Windows Virtual PC搭建window xp系统
    Runtime 解读
    Reachability实时监控网络变化
    关于AsyncSocket
    关于CoreData的用法
    邓白氏编码申请
    Android 到底是个什么东西?
    听 Fabien Potencier 谈Symfony2 之 《What is Symfony2 ?》
    听 Fabien Potencier 谈Symfony2 之 《What is Dependency Injection ?》
  • 原文地址:https://www.cnblogs.com/hark0623/p/4172462.html
Copyright © 2011-2022 走看看