zoukankan      html  css  js  c++  java
  • spark2.x-java-用spark-sql和spark-streaming 统计网站的访客数(uv)

    环境:
    spark2.2.0 JDK1.8

    感觉网上关于spark2.0的java程序案例太少了,在这里补充一个,大家有好的案例也可以分享啊
    不多说,直接上代码

    /**
    * @author admin
    * @define 统计网站日用户访问量
    * create 2018-01-15 19:55
    */
    public class DailyUvStreaming {

    public static void main(String[] args) throws InterruptedException {

    //设置日志级别
    // Logger.getLogger("org").setLevel(Level.ERROR);

    //初始化spark上下文
    SparkConf conf = new SparkConf()
    .setMaster("local[2]")
    .setAppName("DailyUvStreaming");

    //初始化spark上下文 以及时间间隔
    JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));

    //监听TCP服务
    JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

    //将 DStream 转换成 DataFrame 并且运行sql查询
    lines.foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() {
    @Override
    public void call(JavaRDD<String> rdd, Time time) {
    SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());

    //通过反射将RDD转换为DataFrame
    JavaRDD<UserAccessLog> rowRDD = rdd.map(new Function<String, UserAccessLog>() {
    @Override
    public UserAccessLog call(String line) {
    UserAccessLog userLog = new UserAccessLog();
    String[] cols = line.split(" ");
    userLog.setDate(cols[0]);
    userLog.setUserId(cols[1]);
    return userLog;
    }
    });
    Dataset<Row> dataFrame = spark.createDataFrame(rowRDD, UserAccessLog.class);

    // 创建临时表
    dataFrame.createOrReplaceTempView("log");

    //按日期分组 去重userId,计算访客数
    Dataset<Row> result =
    spark.sql("select date, count(distinct userId) as uv from log group by date");
    System.out.println("========= " + time + "=========");

    //输出前20条数据
    result.show();
    }
    });


    //开始流式计算
    jssc.start();
    // 等待计算终止
    jssc.awaitTermination();
    jssc.stop(true);


    }
    }


    /**
    * 懒汉式单例
    */
    class JavaSparkSessionSingleton {
    private static transient SparkSession instance = null;

    public static SparkSession getInstance(SparkConf sparkConf) {
    if (instance == null) {
    instance = SparkSession
    .builder()
    .config(sparkConf)
    .config("spark.testing.memory", "2147480000")
    .getOrCreate();
    }
    return instance;
    }
    }
    ————————————————
    版权声明:本文为CSDN博主「认真起来的菜鸟」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/qq_16038125/article/details/79074287

  • 相关阅读:
    数据预处理和特征工程
    批量梯度下降,随机梯度下降,小批量梯度下降
    动态规划和贪心算法的区别
    广告计价方式:CPM,CPC,CPA
    隐语义模型LFM
    windows下安装xgboost
    KMP算法
    sklearn中的SGDClassifier
    JS变量和数据类型
    JS的基本使用 使用外部的JS文件
  • 原文地址:https://www.cnblogs.com/javalinux/p/15065038.html
Copyright © 2011-2022 走看看