zoukankan      html  css  js  c++  java
  • AreaTopN

    package com.bjsxt.spark.others.pvuv;

    import java.util.List;

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;

    import scala.Tuple2;

    public class AreaTopN {
    public static void main(String[] args) {
    SparkConf conf = new SparkConf();
    conf.setMaster("local").setAppName("PV");
    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<String> lines = sc.textFile("./pvuvdata");
    JavaPairRDD<WebSiteInfo, String> mapToPair = lines.mapToPair(new PairFunction<String, String, Integer>() {

    /**
    *
    */
    private static final long serialVersionUID = 1L;

    @Override
    public Tuple2<String, Integer> call(String line) throws Exception {

    return new Tuple2<String, Integer>(line.split(" ")[1]+"_"+line.split(" ")[5],1);
    }
    }).reduceByKey(new Function2<Integer, Integer, Integer>() {

    /**
    *
    */
    private static final long serialVersionUID = 1L;

    @Override
    public Integer call(Integer v1, Integer v2) throws Exception {
    return v1+v2;
    }
    }).mapToPair(new PairFunction<Tuple2<String,Integer>, WebSiteInfo, String>() {

    /**
    *
    */
    private static final long serialVersionUID = 1L;

    @Override
    public Tuple2<WebSiteInfo, String> call(Tuple2<String,Integer> t) throws Exception {
    WebSiteInfo webSiteInfo = new WebSiteInfo(t._1.split("_")[1],t._1.split("_")[0],t._2);
    String s = t._1.split("_")[1]+","+t._1.split("_")[0]+"="+t._2;
    return new Tuple2<WebSiteInfo, String>(webSiteInfo,s);
    }
    });
    mapToPair.sortByKey().foreach(new VoidFunction<Tuple2<WebSiteInfo,String>>() {

    /**
    *
    */
    private static final long serialVersionUID = 1L;

    @Override
    public void call(Tuple2<WebSiteInfo, String> t)
    throws Exception {
    System.out.println(t);
    }
    });
    // List<Tuple2<String, Integer>> take = mapToPair.take(5);
    // for(Tuple2<String, Integer> t:take){
    // System.out.println(t);
    // }
    sc.stop();
    }
    }

  • 相关阅读:
    C#判断网络链接状态
    C# 创建临时文件(转帖)
    C# 很久以前几个常用类
    正则附表
    如何判断WebBrowser浏览器网页加载完成
    控件阴影
    C# 使用WM_COPYDATA传输数据(两个窗体间通信)
    C# 调用POST请求
    改变无边框窗体的尺寸大小和移动无边框窗体
    IT学习网站
  • 原文地址:https://www.cnblogs.com/huiandong/p/9194632.html
Copyright © 2011-2022 走看看