zoukankan      html  css  js  c++  java
  • pig自定义UDF

    自定义UDF,由此可见,pig还是很值得一用的,它也提供插入python代码

    package com.zhangdan.ykt;
    
    import java.io.IOException;
    import java.text.DateFormat;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.Iterator;
    
    import org.apache.pig.EvalFunc;
    import org.apache.pig.data.BagFactory;
    import org.apache.pig.data.DataBag;
    import org.apache.pig.data.Tuple;
    import org.apache.pig.data.TupleFactory;
    
    /**
     * 
     * @author xunying 为每个有关系可能有关系的学生生成数据对
     */
    public class GetConnection extends EvalFunc<DataBag> {
        TupleFactory tupleFactory = TupleFactory.getInstance();
        BagFactory bagFactory = BagFactory.getInstance();
        static long dis = 3 * 60;// 时间设定,超过三分钟就不存在联系
        int maxP = 20;// 设定20个相关联的人,将会有40个相关联的人
        Person[] line = new Person[maxP];// 设定maxp个相关联的人
        int indexS = 0;// 标记开始和结束队列
        int indexE = 0;
        boolean notice = false;// 标记
    
        public class Person {
            String account;
            String jntime;
        }
    
        public static boolean isFriendM(String jn1, String jn2) {
            // 获取两个时间参数
            DateFormat format = new SimpleDateFormat("yyyyMMdd HH:mm:ss");
            Date date1 = null;
            Date date2 = null;
            try {
                date1 = format.parse(jn1);
                date2 = format.parse(jn2);
            } catch (ParseException e) {
                e.printStackTrace();
            }
            long diff = (int) ((date2.getTime() - date1.getTime()) / 1000);
            return diff < dis ? true : false;
        }
    
        @Override
        public DataBag exec(Tuple input) throws IOException {
            ArrayList<Tuple> tuples = new ArrayList<Tuple>();
            DataBag values = (DataBag) input.get(0);
            String account = null;
            String jntime = null;
    
            if (input == null || input.size() == 0) {
                return null;
            }
            for (int i = 0; i < maxP; i++)
                line[i] = new Person();
            for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
                Tuple t = it.next();
                account = (String) t.get(0);
                jntime = (String) t.get(2) + (String) t.get(3) + (String) t.get(4) + " " + (String) t.get(5);
                if (indexE == indexS && notice)
                    indexS = (indexS + 1) % maxP;
                line[indexE].account = account;
                line[indexE].jntime = jntime;
                try {
                    for (; indexS != indexE;) { // 修改开始索引
                        if (isFriendM(line[indexS].jntime, line[indexE].jntime))
                            break;
                        indexS = (indexS + 1) % maxP;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                String tmp1, tmp2;
                for (int i = indexS; i != indexE;) {
                    Tuple tuple = tupleFactory.newTuple();
                    tmp1 = line[i].account;
                    tmp2 = line[indexE].account;
                    try {
                        if (Integer.parseInt(line[i].account) > Integer.parseInt(line[indexE].account)) {
                            tmp2 = line[i].account;
                            tmp1 = line[indexE].account;
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    if (tmp1 != null && tmp2 != null && tmp1.trim() != "" && tmp2.trim() != "") {
                        tuple.append(tmp1);
                        tuple.append(tmp2);
                        tuple.append(line[i].jntime);
                        tuple.append(line[indexE].jntime);
                        tuples.add(tuple);
                    }
                    i = (i + 1) % maxP;
                }
                indexE = (indexE + 1) % maxP;
                notice = true;
            }
            DataBag bag = bagFactory.newDefaultBag(tuples);
            return bag;
        }
    
    }
  • 相关阅读:
    C#日期加减
    c#的预编译指令
    IHttpModule与IHttpHandler的区别整理
    ASP.NET提供程序
    在所有页面共享通用行为
    5.Oracle中的数据表
    asp.net跳转页面的三种方法比较(转)
    C#实现经典排序算法
    ASP.NET用户登录模块代码
    http错误锦集
  • 原文地址:https://www.cnblogs.com/xunyingFree/p/5282263.html
Copyright © 2011-2022 走看看