zoukankan      html  css  js  c++  java
  • Hive分组提取TopN操作

    今天在测试摄影部落做关联规则推荐的效果

    这种推荐不能算是个性化推荐,因为只要前件项目固定了,推荐列表就是固定的,推荐的结果不会跟着人走

    最终会成表一张规则表

    Table(Item_a, Item_b, Rel)

    其中Rel表示俩个项目之间的关系强弱

    为了给每个项目生成一张推荐列表,则必须按照关系强弱做分组TopN操作

    Hive的基础SQL没办法满足需求,只能自己写UDAF和UDTF了

    View Code
    package Item;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.Comparator;
    import org.apache.hadoop.hive.ql.exec.UDAF;
    import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
    import org.apache.hadoop.io.IntWritable;
    
    public class UDAFTopN extends UDAF {
        //定义一个对象用于存储数据
        public static class State {
            public static class Target{
                //神经病的,用Text的时候就运行结果不正确
                String target;
                Double score;
                public Target(){
                    target = null;
                    score = null;
                }
            }
            public ArrayList<Target> list = new ArrayList<Target>();
            private int limit;
        }
    
        /**
         * 累加数据,把要排序的目标和目标值存进map中
         * @param s 存储对象
         * @param o 待排序目标
         * @param i 目标的值
         */
        private static void increment(State s, String o, Double i) {
            if (s.list == null) {
                s.list = new ArrayList<State.Target>();
            }
            State.Target tmpTarget = new State.Target();
            tmpTarget.target = o;
            tmpTarget.score = i;
            s.list.add(tmpTarget);
            tmpTarget = new State.Target();
        }
        public static class UDAFTopNEvaluator implements UDAFEvaluator {
            private final State state;
            public UDAFTopNEvaluator() {
                state = new State();
            }
            @Override
            public void init() {
                if (state.list != null) {
                    state.list.clear();
                }
                if (state.limit == 0) {
                    state.limit = 100;
                }
            }
            public boolean iterate(String target, Double score, IntWritable limits) {
                if (target == null || score == null || limits == null) {
                    return false;
                } else {
                    state.limit = limits.get();
                    increment(state, target, score);
                }
                return true;
            }
            public State terminatePartial() {
                return state;
            }
            public boolean merge(State other) {
                if (state == null || other == null) {
                    return false;
                }
                state.limit = other.limit;
                state.list.addAll(other.list);
                return true;
            }
            public String terminate() {
                if (state == null || state.list.size() == 0) {
                    return null;
                }
                Collections.sort(state.list, new Statecomparator1());
                int TopN = state.limit;
                int size = state.list.size();
                StringBuffer str = new StringBuffer();
                for (int i = 0; i < TopN && i < size; i++) {
                    str.append(state.list.get(i).target).append(",").append(state.list.get(i).score).append(";");
                }
                return new String(str.toString());
            }
            /*
             * 实现一个list按值的排序算法
             */
            @SuppressWarnings("unchecked")
            public class Statecomparator1 implements Comparator<Object>{
                public int compare(Object o1, Object o2){
                    State.Target s1 = (State.Target)o1;
                    State.Target s2 = (State.Target)o2;
                    return s1.score < s2.score ? 1:0;
                }
            }
        }
    }

    接下来写一下UDTF,按照约定分隔符把一行拆出多行多列

    View Code
    package Item;
    import java.util.ArrayList;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
    import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    
    public class UDTFTopN extends GenericUDTF {
        @Override
        public void close() throws HiveException {
        }
        @Override
        public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
            if (args.length != 1) {
                throw new UDFArgumentLengthException("ExplodeMap takes only one argument");
            }
            if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
                throw new UDFArgumentException("ExplodeMap takes string as a parameter");
            }
            ArrayList<String> fieldNames = new ArrayList<String>();
            ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
            fieldNames.add("col1");
            fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
            fieldNames.add("col2");
            fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
            fieldNames.add("col3");
            fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
            return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
        }
        @Override
        public void process(Object[] args) throws HiveException {
            String input = args[0].toString();
            String[] test = input.split(";");
            int size = test.length;
            for (int i = 0; i < test.length; i++) {
                try {
                    String[] result  = new String[3];
                    String[] sp= test[i].split(",");
                    result[0] =sp[0];
                    result[1] =sp[1];
                    result[2] = String.valueOf(size - i);
                    forward(result);
                } catch (Exception e) {
                    continue;
                }
            }
        }
    }

    此时就可以为hive增加自定义函数后跟sql结合起来了

    View Code
    add jar /tmp/juefan/function/Photo.jar;
    create temporary function udaftopn as 'Item.UDAFTopN';
    create temporary function udtftopn as 'Item.UDTFTopN';
    SET hive.exec.reducers.bytes.per.reducer=32000000;
    drop table dm_fan_photo_prob_rank;
    create table dm_fan_photo_prob_rank as
    select r.item_a, r.item_b, c.co, r.point, r.sim
    from(
    select a.item_a, b.item_b, b.sim, b.point
    from(
    select item_a, udaftopn(item_b, sim, 30) as ranks
    from dm_fan_photo_prob_sim
    where item_a <> item_b
    group by item_a
    )a
    lateral view udtftopn(a.ranks)b as item_b, sim, point
    )r
    join dm_fan_photo_prob_co c
    on r.item_a = c.item_a and r.item_b = c.item_b;

    上面是几个月前写的,用了UDAF跟UDTF,其实还有一种更简单的办法,直接用UDF,不过要跟row_number结合使用

    步骤:

    1)先按照用户ID进行distribute by 分发,然后再在组内按点击时间排序,再利用row_number生成序列号

    ……未完待续

  • 相关阅读:
    Spring的声明试事务
    spring-AOP-添加日志
    弹窗插件
    工厂设计模式
    smartUpload组件批量下载
    简单的C++委托 —— 用模板类实现类成员函数的回调
    偷Microsoft师学MFC艺:且看C++如何支持反射
    C++中回调(CallBack)的使用方法
    epoll 使用实例
    C++成员函数指针的应用
  • 原文地址:https://www.cnblogs.com/juefan/p/3028999.html
Copyright © 2011-2022 走看看