zoukankan      html  css  js  c++  java
  • spark java api数据分析实战

    1 spark关键包

    <!--spark-->

    <dependency>
    <groupId>fakepath</groupId>
    <artifactId>spark-core</artifactId>
    <version>2.10-1.5.1</version>
    </dependency>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming</artifactId>
    <version>2.10-1.5.1</version>
    </dependency>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka</artifactId>
    <version>2.10-1.5.1</version>
    </dependency>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql</artifactId>
    <version>2.10-1.5.1</version>
    </dependency>

    <dependency>
    <groupId>backport-util-concurrent.org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.10.4</version>
    </dependency>
    <dependency>
    <groupId>com.hw</groupId>
    <artifactId>hadoop-mapreduce-client-core</artifactId>
    <version>2.7.2</version>
    </dependency>

    <dependency>
    <groupId>org.objenesis</groupId>
    <artifactId>kryo</artifactId>
    <version>2.21</version>
    </dependency>

    <dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.5</version>
    </dependency>

    2 分析模型昼伏夜出 spark-java

    package com.xinyi.spark.analysis.tasks;

    import com.google.common.base.Optional;
    import com.xinyi.spark.analysis.utils.dbhelper.DBHelper;
    import com.xinyi.xframe.base.utils.StringUtils;
    import com.xinyi.xframe.base.utils.UUIDGenerator;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.filter.CompareFilter;
    import org.apache.hadoop.hbase.filter.Filter;
    import org.apache.hadoop.hbase.filter.FilterList;
    import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
    import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
    import org.apache.hadoop.hbase.util.Base64;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import scala.Tuple2;

    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.*;

    public class RecordInfoSparkAnalsis {

    //查询任务列表
    private static DBHelper dbHelper = new DBHelper("xinyidb");
    private final static String endNum = "9";
    public static void main(String[] args) {


    String sql ="select id,to_char(starttime,'yyyymmddhh24miss') starttime," +
    "to_char(endtime,'yyyymmddhh24miss') endtime,starthour,endhour," +
    "to_char(createtime,'yyyymmddhh24miss') createtime from recordinfo_task where status='0'";
    List<Map<String,Object>> taskList = dbHelper.query(sql);
    System.out.println(taskList);
    if(taskList.isEmpty()){
    System.out.println("任务列表为空!");
    return;
    }
    for(Map<String,Object> task :taskList){
    String taskid = String.valueOf(task.get("ID"));
    updateRecordTask(taskid,"2");
    }


    //初始化Spark环境
    SparkConf conf = new SparkConf().setAppName("RecordInfoSparkAnalsis");
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
    conf.set("spark.kryo.registrator", "com.xinyi.spark.analysis.utils.MyRegistrator");
    conf.set("spark.kryoserializer.buffer.max", "256");
    JavaSparkContext jsc = new JavaSparkContext(conf);
    // 构建spark-Hbase配置
    Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration());
    //初始化rowkey存储设计的搜索方式
    int endInt = Integer.valueOf(endNum);

    for(Map<String,Object> task :taskList){
    Object startObj = task.get("STARTTIME");
    Object endObj = task.get("ENDTIME");
    if(!StringUtils.isEmpty(startObj)&&!StringUtils.isEmpty(endObj)){
    long s = System.currentTimeMillis();
    String startTime = String.valueOf(startObj);
    String endTime = String.valueOf(endObj);
    String blackStartHour = String.valueOf(task.get("STARTHOUR"));
    String blackEndHour = String.valueOf(task.get("ENDHOUR"));
    System.out.println(blackStartHour+"---"+blackEndHour);
    //全局RDD
    JavaPairRDD<String, Long> white = null;
    JavaPairRDD<String, Long> black = null;
    for (int i = 0; i <= endInt; i++) {
    //根据时间设置初始和结束rowkey
    String startkey = String.valueOf(i) + startTime;
    String endkey = String.valueOf(i) +endTime;
    System.out.println(startkey);
    System.out.println(endkey);
    //查询晚上数据rdd
    JavaPairRDD<String, Long> reduceRdd2 = getStringLongJavaPairRDD(jsc, hbConf, startkey, endkey,blackStartHour,blackEndHour);
    if(black==null){
    black = reduceRdd2;
    }else {
    black = black.union(reduceRdd2);
    }
    //查询白天数据rdd
    JavaPairRDD<String, Long> whiteReduceRdd = getStringLongJavaPairRDD(jsc, hbConf, startkey, endkey,blackEndHour,"235959");
    if(white==null){
    white = whiteReduceRdd;
    }else {
    white = white.union(whiteReduceRdd);
    }
    }
    System.out.println(black.collectAsMap());
    black = black.reduceByKey(new Function2<Long, Long, Long>() {
    public Long call(Long a1, Long a2) throws Exception {
    return a1 + a2;
    }
    });
    white = white.reduceByKey(new Function2<Long, Long, Long>() {
    public Long call(Long a1, Long a2) throws Exception {
    return a1 + a2;
    }
    });
    //根据key左连接
    JavaPairRDD<String,Tuple2<Long,Optional<Long>>> joinRdd = black.leftOuterJoin(white);
    joinRdd = joinRdd.filter(new Function<Tuple2<String, Tuple2<Long, Optional<Long>>>, Boolean>() {
    @Override
    public Boolean call(Tuple2<String, Tuple2<Long, Optional<Long>>> stringTuple2Tuple2) throws Exception {
    Long val1 = stringTuple2Tuple2._2._1;
    Long val2 = 0l;
    Set valSet = stringTuple2Tuple2._2._2.asSet();
    for(Object val:valSet){
    val2= Long.valueOf(val.toString());
    }
    //System.out.println(val1+"--"+val2);
    if(valSet.isEmpty()&&val1>3){
    return true;
    }else if(val2<1&&val1>3){
    return true;
    }
    return false;
    }
    });

    Map<String,Tuple2<Long,Optional<Long>>> collectMap = joinRdd.collectAsMap();
    System.out.println(collectMap);

    String taskid = String.valueOf(task.get("ID"));
    //保存结果到数据库
    insert2RecordResult(taskid,collectMap);
    updateRecordTask(taskid,"1");
    long se = System.currentTimeMillis();
    System.out.println("共耗时:"+(se-s));
    }

    }
    jsc.stop();
    }

    private static void updateRecordTask(String taskid,String status){
    String sql = "update recordinfo_task set status='"+status+"' where id='"+taskid+"'";
    dbHelper.update(sql);
    System.out.println("任务表状态已更新!");
    }
    /**
    * 结果集插入到oracle结果表recordinfo_result
    * @param taskid
    * @param results
    */
    private static void insert2RecordResult(String taskid, Map<String, Tuple2<Long, Optional<Long>>> results){
    Set<String> keySet = results.keySet();
    for(String key :keySet){
    Tuple2<Long, Optional<Long>> vals = results.get(key);
    String id= UUIDGenerator.generateOriginnalUUID();
    String sql = "insert into recordinfo_result (id,taskid,tenementid,num) values ('"+id+"','"+taskid+"','"+key+"','"+vals._1+"')";
    dbHelper.update(sql);
    }
    System.out.println("结果集已插入数据库");
    }


    /**
    * 把所有相同的key对应的value累加起来并过滤value>某个值的
    * @param black
    * @param val 过滤比较值
    * @param compare 比较符
    * @return
    */
    private static JavaPairRDD<String, Long> getStringLongJavaPairRDD(JavaPairRDD<String, Long> black,final int val,final String compare) {
    black = black.reduceByKey(new Function2<Long, Long, Long>() {
    @Override
    public Long call(Long a1, Long a2) throws Exception {
    return a1 + a2;
    }
    });
    black = black.filter(new Function<Tuple2<String, Long>, Boolean>() {
    @Override
    public Boolean call(Tuple2<String, Long> stringLongTuple2) throws Exception {
    if(">".equals(compare)){
    if(stringLongTuple2._2>val){
    //System.out.println(stringLongTuple2._1+"---"+stringLongTuple2._2);
    return true;
    }
    }else if("<".equals(compare)){
    if(stringLongTuple2._2<val){
    //System.out.println(stringLongTuple2._1+"==="+stringLongTuple2._2);
    return true;
    }
    }
    return false;
    }
    });
    return black;
    }

    /**
    * 根据rowkey范围及hourlong范围 查询Hbase 获取JavaPairRDD
    * @param jsc
    * @param hbConf
    * @param startkey
    * @param endkey
    * @param startHour
    * @param endHour
    * @return
    */
    private static JavaPairRDD<String, Long> getStringLongJavaPairRDD(JavaSparkContext jsc, Configuration hbConf, String startkey, String endkey,String startHour,String endHour) {
    Scan scan = new Scan(Bytes.toBytes(startkey), Bytes.toBytes(endkey));
    // Scan scan = new Scan();
    scan.setCacheBlocks(true);
    scan.setCaching(10000);
    scan.setStartRow(Bytes.toBytes(startkey));
    scan.addFamily(Bytes.toBytes("info"));//colomn family

    //晚上时间过滤条件
    FilterList filterList = new FilterList();
    Filter gtfilter = new SingleColumnValueFilter(Bytes.toBytes("info"),Bytes.toBytes("hourlong"), CompareFilter.CompareOp.GREATER,Bytes.toBytes(startHour));
    filterList.addFilter(gtfilter);
    Filter ltfilter = new SingleColumnValueFilter(Bytes.toBytes("info"),Bytes.toBytes("hourlong"), CompareFilter.CompareOp.LESS,Bytes.toBytes(endHour));
    filterList.addFilter(ltfilter);
    scan.setFilter(filterList);

    org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan proto = null;
    try {
    proto = ProtobufUtil.toScan(scan);
    } catch (IOException e) {
    e.printStackTrace();
    }

    String scanToString = Base64.encodeBytes(proto.toByteArray());
    hbConf.set(TableInputFormat.INPUT_TABLE, "recordinfo");//table name
    hbConf.set(TableInputFormat.SCAN, scanToString);

    JavaPairRDD<ImmutableBytesWritable, Result> rdd = jsc.newAPIHadoopRDD(hbConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);
    //过滤算子,过滤集合只保留想要的字段作为key,设value=1
    JavaPairRDD<String, Long> rddmap = rdd.mapToPair(new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, Long>() {
    public Tuple2<String, Long> call(Tuple2<ImmutableBytesWritable, Result> item) throws Exception {
    Iterator<Cell> it = item._2().listCells().iterator();
    String tenementid = "";
    while (it.hasNext()) {
    Cell c = it.next();
    String qualifier = Bytes.toString(CellUtil.cloneQualifier(c));
    if (qualifier.equals("tenementid")) {
    tenementid = Bytes.toString(CellUtil.cloneValue(c)).trim();
    }
    }
    return new Tuple2<String, Long>(tenementid, 1L);
    }
    });
    //根据key值累加value
    return rddmap;
    }
    }

  • 相关阅读:
    JavaScript函数中的this四种绑定形式
    jQuery的html()、text()和val()的使用和区别
    iframe-父子-兄弟页面相互传值(jq和js两种方法)
    Spring Boot 嵌入式 Tomcat 文件上传、url 映射虚拟路径
    SpringMVC上传图片
    <iframe>和<frame>标签属性详解
    Mybatis 事物回滚最简单的操作方式
    SpringBoot配置log4j
    springboot整合redis(集群)
    Maven setting.xml简易配置
  • 原文地址:https://www.cnblogs.com/yzlsthl/p/9099276.html
Copyright © 2011-2022 走看看