zoukankan      html  css  js  c++  java
  • Hive自定义UDAF

    package com.rtmap.udfs;
    
    import org.apache.commons.lang3.StringUtils;
    import org.apache.hadoop.hive.ql.exec.UDAF;
    import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
    
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.*;
    
    /**
     * Created by doge on 2015/11/17.
     */
    public class StayTime extends UDAF {
        private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        private static final Integer TRANSITION_TIME = 10;// 10s
        private static final Integer HALF_HOUR = 60 * 30;
        private static final String FIELD_DELIMITER = "_";
        private static final String DELIMITER_DUMMY = "-";
        private static final String ROW_DELIMITER = "\t";
        private static final String NO_STAY_TIME = "0";
    
        /**
         * UDAF worker
         */
        public static class StayTimeUDAFEvaluator implements UDAFEvaluator {
            private ArrayList<String> source;
    
            public StayTimeUDAFEvaluator() {
                super();
                init();
            }
    
            public void init() {
                source = new ArrayList<>();
            }
    
            /**
             * iterate every row
             *
             * @param objects
             * @return
             */
            /*public boolean iterate(String floor, String store, String timestamp) {*/
            public boolean iterate(Object[] objects) {
                if (objects == null || objects.length == 0) {
                    return false;
                }
                String floor = objects[0].toString().trim();
                String store = objects[1].toString().trim().replace(FIELD_DELIMITER, DELIMITER_DUMMY);
                String timestamp = objects[2].toString();
                try {
                    timestamp = SIMPLE_DATE_FORMAT.format(SIMPLE_DATE_FORMAT.parse(timestamp));
                } catch (ParseException e) {
                    e.printStackTrace();
                }
                String tuple = floor + FIELD_DELIMITER + store + FIELD_DELIMITER + timestamp;
                source.add(tuple);
                return true;
            }
    
            public ArrayList<String> terminatePartial() {
                return source;
            }
    
            public boolean merge(ArrayList<String> portion) {
                if (portion == null) {
                    return false;
                }
                source.addAll(portion);
                return true;
            }
    
            public String terminate() {
                ArrayList<String[]> buffer = new ArrayList<>();
                for (String string : source) {
                    String[] tuple = string.split(FIELD_DELIMITER);
                    buffer.add(tuple);
                }
    
                if (buffer.size() == 1) {// occur one time
                    String[] single = buffer.get(0);
                    String floor = single[0];
                    String store = single[1];
                    String bubble = single[2];
                    return floor + FIELD_DELIMITER + store + FIELD_DELIMITER + NO_STAY_TIME + FIELD_DELIMITER + bubble + FIELD_DELIMITER + bubble;// ?
                }
    
                Collections.sort(buffer, new Comparator<String[]>() {
                    @Override
                    public int compare(String[] o1, String[] o2) {
                        return o1[2].compareTo(o2[2]);
                    }
                });
    
                String regionMark = null;
                String regionTime = null;
                String lastTime = null;
                List<String> results = new ArrayList<>();
                for (int i = 0, j = buffer.size(); i < j; i++) {
                    String[] strings = buffer.get(i);
                    String key = strings[0] + FIELD_DELIMITER + strings[1];// floor + name
                    String timestamp = strings[2];
    
                    if (i == 0 && regionMark == null && regionTime == null) {// first time
                        regionMark = key;// region mark
                        regionTime = timestamp;
                        lastTime = timestamp;// last time
                        continue;
                    }
    
                    int normalDiff = dateDiff(timestamp, regionTime);
                    String suffix = FIELD_DELIMITER + regionTime + FIELD_DELIMITER + timestamp;
                    String stayTime = normalDiff + suffix;
                    if (i == j - 1) {// last one
                        if (key.equals(regionMark)) {
                            if (normalDiff <= HALF_HOUR) {
                                results.add(regionMark + FIELD_DELIMITER + stayTime);
                            } else {
                                stayTime = dateDiff(lastTime, regionTime) + StringUtils.EMPTY;
                                stayTime += FIELD_DELIMITER + regionTime + FIELD_DELIMITER + lastTime;
                                results.add(regionMark + FIELD_DELIMITER + stayTime);
                                String lastStayTime = NO_STAY_TIME + FIELD_DELIMITER + timestamp + FIELD_DELIMITER + timestamp;
                                results.add(key + FIELD_DELIMITER + lastStayTime);
                            }
                        } else {
                            if (normalDiff <= TRANSITION_TIME) {
                                results.add(regionMark + FIELD_DELIMITER + stayTime);
                            } else if (normalDiff > TRANSITION_TIME) {
                                if (normalDiff > HALF_HOUR) {
                                    stayTime = dateDiff(lastTime, regionTime) + StringUtils.EMPTY;
                                    stayTime += FIELD_DELIMITER + regionTime + FIELD_DELIMITER + lastTime;
                                }
                                results.add(regionMark + FIELD_DELIMITER + stayTime);
                                String lastStayTime = NO_STAY_TIME + FIELD_DELIMITER + timestamp + FIELD_DELIMITER + timestamp;
                                results.add(key + FIELD_DELIMITER + lastStayTime);
                            }
                        }
                        break;
                    }
    
                    if (key.equals(regionMark)) {
                        if (normalDiff > HALF_HOUR) {
                            stayTime = dateDiff(lastTime, regionTime) + StringUtils.EMPTY;
                            stayTime += FIELD_DELIMITER + regionTime + FIELD_DELIMITER + lastTime;
                            results.add(regionMark + FIELD_DELIMITER + stayTime);
                            regionTime = timestamp;
                        }
                        lastTime = timestamp;
                        continue;
                    }
    
                    if (normalDiff <= TRANSITION_TIME) {// middle times, key not equals region mark
                        // less than 10 seconds, do nothing
                    } else if (normalDiff > TRANSITION_TIME) {
                        if (normalDiff > HALF_HOUR) {// if (normalDiff > HALF_HOUR && !lastTime.equals(regionTime)) {
                            stayTime = dateDiff(lastTime, regionTime) + StringUtils.EMPTY;
                            stayTime += FIELD_DELIMITER + regionTime + FIELD_DELIMITER + lastTime;
                        }
                        results.add(regionMark + FIELD_DELIMITER + stayTime);
                        regionMark = key;
                        regionTime = timestamp;
                    }
                    lastTime = timestamp;// last time
                }
    
                String result = StringUtils.EMPTY;
                for (String string : results) {
                    if (result.length() > 0) {
                        result += ROW_DELIMITER;
                    }
                    result += string;
                }
                return result.length() == 0 ? null : result;
            }
        }
    
        /**
         * calc the seconds between two timestamp
         *
         * @param left
         * @param right
         * @return
         */
        private static Integer dateDiff(String left, String right) {
            try {
                Date bigger = SIMPLE_DATE_FORMAT.parse(left);
                Date smaller = SIMPLE_DATE_FORMAT.parse(right);
                long transition = (bigger.getTime() - smaller.getTime()) / 1000;// return seconds
                return Integer.valueOf(transition + StringUtils.EMPTY);
            } catch (ParseException e) {
                e.printStackTrace();
            }
            return 0;
        }
    }
  • 相关阅读:
    JSON总结
    protobuf 编码实现解析(java)
    Java Enum解析【转】
    protobuf 原理
    apache CXF wsdl2java工具的使用
    Web Service中的几个重要术语
    Servlet过滤器
    Java中static关键字的作用和用法详细介绍
    浅析Java中的final关键字
    Java中按值传递与按引用传递的区别
  • 原文地址:https://www.cnblogs.com/mengyao/p/5667456.html
Copyright © 2011-2022 走看看