zoukankan      html  css  js  c++  java
  • AreaTopN

    package com.bjsxt.spark.others.pvuv;

    import java.util.List;

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;

    import scala.Tuple2;

    public class AreaTopN {
    public static void main(String[] args) {
    SparkConf conf = new SparkConf();
    conf.setMaster("local").setAppName("PV");
    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<String> lines = sc.textFile("./pvuvdata");
    JavaPairRDD<WebSiteInfo, String> mapToPair = lines.mapToPair(new PairFunction<String, String, Integer>() {

    /**
    *
    */
    private static final long serialVersionUID = 1L;

    @Override
    public Tuple2<String, Integer> call(String line) throws Exception {

    return new Tuple2<String, Integer>(line.split(" ")[1]+"_"+line.split(" ")[5],1);
    }
    }).reduceByKey(new Function2<Integer, Integer, Integer>() {

    /**
    *
    */
    private static final long serialVersionUID = 1L;

    @Override
    public Integer call(Integer v1, Integer v2) throws Exception {
    return v1+v2;
    }
    }).mapToPair(new PairFunction<Tuple2<String,Integer>, WebSiteInfo, String>() {

    /**
    *
    */
    private static final long serialVersionUID = 1L;

    @Override
    public Tuple2<WebSiteInfo, String> call(Tuple2<String,Integer> t) throws Exception {
    WebSiteInfo webSiteInfo = new WebSiteInfo(t._1.split("_")[1],t._1.split("_")[0],t._2);
    String s = t._1.split("_")[1]+","+t._1.split("_")[0]+"="+t._2;
    return new Tuple2<WebSiteInfo, String>(webSiteInfo,s);
    }
    });
    mapToPair.sortByKey().foreach(new VoidFunction<Tuple2<WebSiteInfo,String>>() {

    /**
    *
    */
    private static final long serialVersionUID = 1L;

    @Override
    public void call(Tuple2<WebSiteInfo, String> t)
    throws Exception {
    System.out.println(t);
    }
    });
    // List<Tuple2<String, Integer>> take = mapToPair.take(5);
    // for(Tuple2<String, Integer> t:take){
    // System.out.println(t);
    // }
    sc.stop();
    }
    }

  • 相关阅读:
    软件开发人员的简历项目经验怎么写?
    mapreduce 多种输入
    lnmp如何实现伪静态,默认目录伪静态
    LNMP环境中WordPress程序伪静态解决方案
    wordpress必装的插件 wp最常用的十个插件
    debian系统下改语言设置
    Centos7 开启端口
    EventProcessor与WorkPool用法--可处理多消费者
    Disruptor入门
    Disruptor初级入门
  • 原文地址:https://www.cnblogs.com/huiandong/p/9194632.html
Copyright © 2011-2022 走看看