zoukankan      html  css  js  c++  java
  • 【慕课网实战】Spark Streaming实时流处理项目实战笔记十七之铭文升级版

    铭文一级:

    功能1:今天到现在为止 实战课程 的访问量

    yyyyMMdd courseid

    使用数据库来进行存储我们的统计结果
    Spark Streaming把统计结果写入到数据库里面
    可视化前端根据:yyyyMMdd courseid 把数据库里面的统计结果展示出来


    选择什么数据库作为统计结果的存储呢?
    RDBMS: MySQL、Oracle...
    day course_id click_count
    20171111 1 10
    20171111 2 10

    下一个批次数据进来以后:
    20171111 + 1 ==> click_count + 下一个批次的统计结果 ==> 写入到数据库中

    NoSQL: HBase、Redis....
    HBase: 一个API就能搞定,非常方便
    20171111 + 1 ==> click_count + 下一个批次的统计结果
    本次课程为什么要选择HBase的一个原因所在

    前提:
    HDFS
    Zookeeper
    HBase

    HBase表设计
    创建表
    create 'imooc_course_clickcount', 'info'
    Rowkey设计
    day_courseid

    如何使用Scala来操作HBase

    铭文二级:

    启动Hbase要先启动HDFS、ZooKeeper

    Hadoop的启动,sbin文件夹:

    ./start-dfs.sh

    HBase的启动,bin文件夹:

    ./start-hbase.sh

    1、建表:create 'imooc_course_clickcount','info'

    查看表:list

    查看表详情:desc imooc_course_clickcount  //desc 'imooc_course_clickcount'

    2、Rowkey的设计:day_courseid

    3、建CourseClickCount类(day_course,click_count)

    4、HBaseUtils工具类的实现

    package com.imooc.spark.project.utils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.client.HBaseAdmin;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.util.Bytes;
    import java.io.IOException;
    /**
     * HBase操作工具类:Java工具类建议采用单例模式封装
     */
    public class HBaseUtils {
        HBaseAdmin admin = null;
        Configuration configuration = null;
        /**
         * 私有改造方法
         */
        private HBaseUtils(){
            configuration = new Configuration();
            configuration.set("hbase.zookeeper.quorum", "hadoop000:2181");
            configuration.set("hbase.rootdir", "hdfs://hadoop000:8020/hbase");
            try {
                admin = new HBaseAdmin(configuration);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        private static HBaseUtils instance = null;
        public  static synchronized HBaseUtils getInstance() {
            if(null == instance) {
                instance = new HBaseUtils();
            }
            return instance;
        }
        /**
         * 根据表名获取到HTable实例
         */
        public HTable getTable(String tableName) {
            HTable table = null;
            try {
                table = new HTable(configuration, tableName);
            } catch (IOException e) {
                e.printStackTrace();
            }
            return table;
        }
        /**
         * 添加一条记录到HBase表
         * @param tableName HBase表名
         * @param rowkey  HBase表的rowkey
         * @param cf HBase表的columnfamily
         * @param column HBase表的列
         * @param value  写入HBase表的值
         */
        public void put(String tableName, String rowkey, String cf, String column, String value) {
            HTable table = getTable(tableName);
            Put put = new Put(Bytes.toBytes(rowkey));
            put.add(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));
            try {
                table.put(put);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        public static void main(String[] args) {
            //HTable table = HBaseUtils.getInstance().getTable("imooc_course_clickcount");
            //System.out.println(table.getName().getNameAsString());
            String tableName = "imooc_course_clickcount" ;
            String rowkey = "20171111_88";
            String cf = "info" ;
            String column = "click_count";
            String value = "2";
            HBaseUtils.getInstance().put(tableName, rowkey, cf, column, value);
        }
    }
    

     关键:

    HBaseAdmin、Configuration
    configuration.set("hbase.zookeeper.quorum", "hadoop000:2181");
    configuration.set("hbase.rootdir", "hdfs://hadoop000:8020/hbase");
    private static HBaseUtils instance = null;
        public  static synchronized HBaseUtils getInstance() {
            if(null == instance) {
                instance = new HBaseUtils();
            }
            return instance;
    }
    HBaseUtils.getInstance().put(tableName, rowkey, cf, column, value);

    5、CourseClickCountDAO类数据访问层的实现

    package com.imooc.spark.project.dao
    import com.imooc.spark.project.domain.CourseClickCount
    import com.imooc.spark.project.utils.HBaseUtils
    import org.apache.hadoop.hbase.client.Get
    import org.apache.hadoop.hbase.util.Bytes
    import scala.collection.mutable.ListBuffer
    /**
      * 实战课程点击数-数据访问层
      */
    object CourseClickCountDAO {
      val tableName = "imooc_course_clickcount"
      val cf = "info"
      val qualifer = "click_count"
      /**
        * 保存数据到HBase
        * @param list  CourseClickCount集合
        */
      def save(list: ListBuffer[CourseClickCount]): Unit = {
    
        val table = HBaseUtils.getInstance().getTable(tableName)
    
        for(ele <- list) {
          table.incrementColumnValue(Bytes.toBytes(ele.day_course),
            Bytes.toBytes(cf),
            Bytes.toBytes(qualifer),
            ele.click_count)
        }
      }
      /**
        * 根据rowkey查询值
        */
      def count(day_course: String):Long = {
        val table = HBaseUtils.getInstance().getTable(tableName)
        val get = new Get(Bytes.toBytes(day_course))
        val value = table.get(get).getValue(cf.getBytes, qualifer.getBytes)
        if(value == null) {
          0L
        }else{
          Bytes.toLong(value)
        }
      }
      def main(args: Array[String]): Unit = {
        val list = new ListBuffer[CourseClickCount]
        list.append(CourseClickCount("20171111_8",8))
        list.append(CourseClickCount("20171111_9",9))
        list.append(CourseClickCount("20171111_1",100))
        save(list)
        println(count("20171111_8") + " : " + count("20171111_9")+ " : " + count("20171111_1"))
      }
    }
    

     

     关键:

    val tableName = "imooc_course_clickcount"
    val cf = "info"
    val qualifer = "click_count"
    def save(list: ListBuffer[CourseClickCount]): Unit = {
    for(ele <- list) {
          table.incrementColumnValue(Bytes.toBytes(ele.day_course),
            Bytes.toBytes(cf),
            Bytes.toBytes(qualifer),
            ele.click_count)
        }
    def count(day_course: String):Long = {
    val value = table.get(get).getValue(cf.getBytes, qualifer.getBytes)

    HBaseUtils.getInstance().put(tableName, rowkey, cf, column, value);
    Bytes.toBytes(day_course) 等价 day_course.getBytes
    if(value == null) {}  //scala里面跟java的equals一样
    list.append(CourseClickCount("20171111_8",8))
    list.append(CourseClickCount("20171111_9",9))


    6、在ImoocStatStreamingApp里原先代码参考:

    // 测试步骤一:测试数据接收
        //messages.map(_._2).count().print
        // 测试步骤二:数据清洗
        val logs = messages.map(_._2)
        val cleanData = logs.map(line => {
          val infos = line.split("	")
          // infos(2) = "GET /class/130.html HTTP/1.1"
          // url = /class/130.html
          val url = infos(2).split(" ")(1)
          var courseId = 0
          // 把实战课程的课程编号拿到了
          if (url.startsWith("/class")) {
            val courseIdHTML = url.split("/")(2)
            courseId = courseIdHTML.substring(0, courseIdHTML.lastIndexOf(".")).toInt
          }
          ClickLog(infos(0), DateUtils.parseToMinute(infos(1)), courseId, infos(3).toInt, infos(4))
        }).filter(clicklog => clicklog.courseId != 0)
    

      添加代码:

        // 测试步骤三:统计今天到现在为止实战课程的访问量
        cleanData.map(x => {
          // HBase rowkey设计: 20171111_88
          (x.time.substring(0, 8) + "_" + x.courseId, 1)
        }).reduceByKey(_ + _).foreachRDD(rdd => {
          rdd.foreachPartition(partitionRecords => {
            val list = new ListBuffer[CourseClickCount]
            partitionRecords.foreach(pair => {
              list.append(CourseClickCount(pair._1, pair._2))
            })
            CourseClickCountDAO.save(list)
          })
        })
    

      

      

  • 相关阅读:
    ZOJ 1002 Fire Net (火力网)
    UVa OJ 117 The Postal Worker Rings Once (让邮差只走一圈)
    UVa OJ 118 Mutant Flatworld Explorers (变体扁平世界探索器)
    UVa OJ 103 Stacking Boxes (嵌套盒子)
    UVa OJ 110 MetaLoopless Sorts (无循环元排序)
    第一次遇到使用NSNull的场景
    NSURL使用浅析
    从CNTV下载《小小智慧树》
    NSDictionary and NSMutableDictionary
    Category in static library
  • 原文地址:https://www.cnblogs.com/kkxwz/p/8406508.html
Copyright © 2011-2022 走看看