zoukankan      html  css  js  c++  java
  • AreaTopN

    package com.bjsxt.spark.others.pvuv;

    import java.util.List;

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;

    import scala.Tuple2;

    public class AreaTopN {
    public static void main(String[] args) {
    SparkConf conf = new SparkConf();
    conf.setMaster("local").setAppName("PV");
    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<String> lines = sc.textFile("./pvuvdata");
    JavaPairRDD<WebSiteInfo, String> mapToPair = lines.mapToPair(new PairFunction<String, String, Integer>() {

    /**
    *
    */
    private static final long serialVersionUID = 1L;

    @Override
    public Tuple2<String, Integer> call(String line) throws Exception {

    return new Tuple2<String, Integer>(line.split(" ")[1]+"_"+line.split(" ")[5],1);
    }
    }).reduceByKey(new Function2<Integer, Integer, Integer>() {

    /**
    *
    */
    private static final long serialVersionUID = 1L;

    @Override
    public Integer call(Integer v1, Integer v2) throws Exception {
    return v1+v2;
    }
    }).mapToPair(new PairFunction<Tuple2<String,Integer>, WebSiteInfo, String>() {

    /**
    *
    */
    private static final long serialVersionUID = 1L;

    @Override
    public Tuple2<WebSiteInfo, String> call(Tuple2<String,Integer> t) throws Exception {
    WebSiteInfo webSiteInfo = new WebSiteInfo(t._1.split("_")[1],t._1.split("_")[0],t._2);
    String s = t._1.split("_")[1]+","+t._1.split("_")[0]+"="+t._2;
    return new Tuple2<WebSiteInfo, String>(webSiteInfo,s);
    }
    });
    mapToPair.sortByKey().foreach(new VoidFunction<Tuple2<WebSiteInfo,String>>() {

    /**
    *
    */
    private static final long serialVersionUID = 1L;

    @Override
    public void call(Tuple2<WebSiteInfo, String> t)
    throws Exception {
    System.out.println(t);
    }
    });
    // List<Tuple2<String, Integer>> take = mapToPair.take(5);
    // for(Tuple2<String, Integer> t:take){
    // System.out.println(t);
    // }
    sc.stop();
    }
    }

  • 相关阅读:
    重写(Overriding)与重载(Overloading)的区别
    A Guide to setup SQL Server Reporting Services (SSRS) with Dynamics AX
    date2Str Function in Dynamics AX 2009
    浅谈程序员加薪问题(转)
    消息队列设计精要
    Redis集群模式原理探究
    SpringBoot内置tomcat原理分析
    Mybatis整体设计探究
    MapStruct 使用详解
    Zookeeper快速领导者选举原理
  • 原文地址:https://www.cnblogs.com/huiandong/p/9194632.html
Copyright © 2011-2022 走看看