功能1: 今天到现在为止 实战课程的访问量
yyyyMMdd courseID
使用数据库来进行存储我们的统计结果
Spark Streaming把统计结果写入到数据库里面
可视化前端根据: yyyyMMdd courseId 把数据库里面的统计结果展示出来
选择什么数据库作为统计结果的存储呢?
RDBMS: MySQL、Oracle...
day courseId click_count
20171111 1 10
20171111 2 10
下一个批次数据进来以后:
20171111 + 1 ==> click_count + 下一个批次的统计结果
NoSQL: HBase、Redis...
HBase: 一个API就能搞定,非常方便
20171111 + 1 ==> click_count + 下一个批次的统计结果
本次课程为什么要选择HBase的一个原因所在
前置要求:
启动HDFS
启动ZK
启动HBase
HBase表设计
创建表
create 'imooc_course_clickcount' , 'info'
Rowkey设计
day_courseId
如何使用Scala来操作HBase数据库呢?
定义: case clas CourseClickCount.scala
package com.imooc.domain
/**
* 实战课程点击数
* @param day_course 对应的就是HBase中的rowkey,20171111_1
* @param click_count 对应的20171111_1的访问总数
*/
case class CourseClickCount(day_course: String, click_count: Int)
CourseClickCountDAO.scala
package com.imooc.dao
import com.imooc.domain.CourseClickCount
import scala.collection.mutable.ListBuffer
/**
* 实战课程点击数数据访问层
*/
object CourseClickCountDAO {
// 定义HBase的表名,列族,列名
val tableName = "imooc_course_clickcount"
val cf = "info"
val qualifer = "click_count"
/**
* 保存数据到HBase
* @param list CourseClickCount集合
* 要实现sava这个方法,就需要HBase的工具类,暂时写
*/
def save(list: ListBuffer[CourseClickCount]): Unit = {
}
/**
* 根据rowkey查询值
* @param day_course
* @return
*/
def count(day_course: String):Long = {
0l
}
}
私有模式(单例模式)已构建完毕
HBaseUtils.scala 1.基本的私有构造方法
package com.imooc.utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import java.io.IOException;
/**
* HBase操作工具类
*/
public class HBase2Utils {
HBaseAdmin admin = null;
Configuration configuration = null;
/**
*私有构造方法
*/
private HBase2Utils() {
// 加载HBase的配置文件 zookeeper rootdir
configuration = new Configuration();
configuration.set("hbase.zookeeper.quorum", "Master:2181");
configuration.set("hbase.rootdir", "hdfs://Master:8020/hbase");
try {
admin = new HBaseAdmin(configuration);
} catch (IOException e) {
e.printStackTrace();
}
}
// Java单例模式
private static HBase2Utils instance = null;
// synchronized 线程同步, 避免线程安全问题
public static synchronized HBase2Utils getInstance() {
if (instance == null) {
instance = new HBase2Utils();
}
return instance;
}
}
2.自定义的getTable方法及测试
// 获取表名,获取后,进行测试
public HTable getTable(String tableName) {
HTable table = null;
try {
table = new HTable(configuration, tableName);
} catch (IOException e) {
e.printStackTrace();
}
return table;
}
// getTable方法的测试
public static void main(String[] args) {
HTable table = HBase2Utils.getInstance().getTable("imooc_course_clickcount");
System.out.println(table.getName().getNameAsString());
}
运行测试代码,看控制台输出,没有输出,取hbase中查看表内数据 list desc scan table rowkey columnfamily column value
3.自定义的put方法及测试
/**
* 添加一条记录到HBase表
* @param tableName 表名
* @param rowkey 表的rowkey
* @param cf 表的列族columnfamily
* @param column 表的列
* @param value 表的值
*/
public void put(String tableName, String rowkey, String cf, String column, String value) {
HTable table = getTable(tableName);
Put put = new Put(rowkey.getBytes());
put.add(cf.getBytes(), column.getBytes(), value.getBytes());
try {
table.put(put);
} catch (IOException e) {
e.printStackTrace();
}
}
// put方法的测试
public static void main(String[] args) {
// put中需要传入的参数: String tableName, String rowkey, String cf, String column, String value
String tableName = "imooc_course_clickcount";
String rowkey = "20171111_88";
String cf = "info";
String column = "click_count"; //访问量的key
String value = "10"; // 访问量value
HBase2Utils.getInstance().put(tableName,rowkey,cf,column,value);
}
运行测试代码,看控制台输出,没有输出,取hbase中查看表内数据 list desc scan table rowkey columnfamily column value scan "imooc_course_clickcount", 看是否有数据添加