zoukankan      html  css  js  c++  java
  • Spark的dataframe转rdd通用工具类

    需求解决问题

    当每次读取hive表或者其他数据源,获取数据,相对其进行rdd操作,遇到任何类都需要df.rdd(row>row.getstring(0))去获取,就很麻烦,所以可以实现个通用的转换方式

    1.dataframe转为rdd通用方法

      /**
       * df转为rdd 通用方法
       *
       * @param frame
       * @return
       */
      def dataFrameToRdd(frame: DataFrame): RDD[Array[Any]] = {
        val fields: Array[StructField] = frame.schema.toArray
        val zipData: Array[(StructField, Int)] = fields.zipWithIndex
        val rdd: RDD[Array[Any]] = frame.rdd.map(row => {
          val res: Array[Row => Any] = zipData.map(structField => squareRowkey2(structField._1, structField._2))
          val array: Array[Any] = res.map(fun => {
            val value = fun(row)
            if (value == null) String.valueOf(value) else value
          })
          array
        })
        rdd
      }
    

      对类型进行判断

    /**
       * 根据schema信息进行判断与封装
       *
       * @param dataType
       * @return
       */
      //封装rowkey
      def squareRowkey2(dataType: (StructField, Int)): (Row) => Any = {
        val (structField, index) = dataType
        structField.dataType match {
          case StringType =>
            (row: Row) => if (row.isNullAt(index)) null else row.getString(index)
          case LongType =>
            (row: Row) =>if (row.isNullAt(index)) null else row.getLong(index)
          case FloatType =>
            (row: Row) => if (row.isNullAt(index)) null else row.getFloat(index)
          case DoubleType =>
            (row: Row) => if (row.isNullAt(index)) null else row.getDouble(index)
          case IntegerType =>
            (row: Row) => if (row.isNullAt(index)) null else row.getInt(index)
          case BooleanType =>
            (row: Row) => if (row.isNullAt(index)) null else row.getBoolean(index)
          case DateType =>
            (row: Row) => if (row.isNullAt(index)) null else row.getDate(index)
          case TimestampType =>
            (row: Row) => if (row.isNullAt(index)) null else row.getTimestamp(index)
          case BinaryType =>
            (row: Row) => if (row.isNullAt(index)) null else row.getAs[Array[Byte]](index)
          case ArrayType(elementType, containsNull) =>
            (row: Row) => {
              val value: mutable.WrappedArray[_ >: Integer with String <: io.Serializable with Comparable[_ >: Integer with String]] = elementType match {
                case IntegerType => {
                  row.getAs[mutable.WrappedArray[Integer]](index)
                }
                case StringType => {
                  row.getAs[mutable.WrappedArray[String]](index)
                }
                case _ => row.getAs[mutable.WrappedArray[String]](index)
              }
              //这儿必须转换为java的list 防止map转json字符串不符合要求
              if (value == null) {
                util.Collections.emptyList()
              }
              JavaConversions.bufferAsJavaList(value.toBuffer)
            }
          case StructType(fields) =>
            (row: Row) => row.getAs[mutable.Map[String, String]](index)
          case _ =>
            (row: Row) => row.getString(index)
        }
      }

     二、rdd转实体对象

    大多数是都是讲数据分装为case calss或者对象

      def  dataFrameToEntity [U: ClassTag] (frame: DataFrame, clazz: Class[U], hiveRdd: RDD[Array[Any]]) = {
        val fields: Array[StructField] = frame.schema.toArray
        val rdd = hiveRdd.map(array => {
          val map = new util.HashMap[String, Any]()
          fields.map(_.name).zip(array)
            .foreach {
              case (k, v) => (map.put(k, v))
            }
          val str = GsonUtil.toJsonString(map)
    //这边转换工具类 就是gson的转为对象的方法 val value: U = GsonUtil.GsonToBean(str, clazz) value }) rdd }

     使用:

        lazy val df: DataFrame =spark.read.table("user")
        //将df转为rdd实体类
        val userRdd: RDD[Array[Any]] = RddUtils.dataFrameToRdd(df)
        val userRDD2: RDD[User] = RddUtils.dataFrameToEntity(df, classOf[User], userRdd)
    

      

    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.alibaba.fastjson.TypeReference;
    import com.alibaba.fastjson.serializer.SerializerFeature;
    import com.google.gson.Gson;
    import com.google.gson.GsonBuilder;
    import com.google.gson.reflect.TypeToken;
    import org.apache.commons.lang3.StringUtils;
    
    import java.lang.reflect.Field;
    import java.util.LinkedHashMap;
    import java.util.List;
    import java.util.Map;
    
    /**
     * @ClassName: com.xxx.sbc.dw.spark.submit.service.GsonUtil
     * @Description: json工具类
     * @Author: imp
     * @Time: 2020/10/23 10:01
     * @Version: 1.0
     */
    
    public class GsonUtil {
    
        private static Gson gson = null;
    
        //判断gson对象是否存在了,不存在则创建对象
        static {
            if (gson == null) {
                //gson = new Gson();            //当使用GsonBuilder方式时属性为空的时候输出来的json字符串是有键值key的,显示形式是"key":null,而直接new出来的就没有"key":null的
                gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create();
            }
        }
    
        //无参的私有构造方法
        private GsonUtil() {
        }
    
        /**
         * 将对象转成json格式
         *
         * @param object
         * @return String
         */
        public static String GsonString(Object object) {
            String gsonString = null;
            if (gson != null) {
                gsonString = gson.toJson(object);
            }
            return gsonString;
        }
    
        /**
         * 将json转成特定的cls的对象
         *
         * @param gsonString
         * @param cls
         * @return
         */
        public static <T> T GsonToBean(String gsonString, Class<T> cls) {
            T t = null;
            if (StringUtils.isNotEmpty(gsonString)) {
                //传入json对象和对象类型,将json转成对象
                t = JSONObject.parseObject(gsonString, cls);
            }
            return t;
        }
    
        /**
         * json字符串转成list
         *
         * @param gsonString
         * @param cls
         * @return
         */
        public static <T> List<T> GsonToList(String gsonString, Class<T> cls) {
            List<T> list = null;
            if (gson != null) {
                //根据泛型返回解析指定的类型,TypeToken<List<T>>{}.getType()获取返回类型
                list = gson.fromJson(gsonString, new TypeToken<List<T>>() {
                }.getType());
            }
            return list;
        }
    
        /**
         * json字符串转成list中有map的
         *
         * @param gsonString
         * @return
         */
        public static <T> List<Map<String, T>> GsonToListMaps(String gsonString) {
            List<Map<String, T>> list = null;
            if (gson != null) {
                list = gson.fromJson(gsonString,
                        new TypeToken<List<Map<String, T>>>() {
                        }.getType());
            }
            return list;
        }
    
    
        public static <T> List<Map<String, T>> gsonToListMaps(String str) {
            List<Map<String, T>> list = null;
            if (gson != null) {
                list = gson.fromJson(str,
                        new TypeToken<List<Map<String, T>>>() {
                        }.getType());
            }
            return list;
        }
    
        /**
         * json字符串转成map的
         *
         * @param gsonString
         * @return
         */
        public static <T> Map<String, String> GsonToMaps(String gsonString) {
            Map<String, String> map = null;
            if (gson != null) {
                map = gson.fromJson(gsonString, new TypeToken<Map<String, String>>() {
                }.getType());
            }
            return map;
        }
    
        /**
         * 判断是否是json
         *
         * @param object
         * @return
         */
        public static Boolean isJson(Object object) {
            try {
                gson.toJson(object);
                return true;
            } catch (Exception e) {
                System.err.format("{} is not json", object.toString());
                return false;
            }
        }
    
        /**
         * 对象转为json字符串
         *
         * @param o
         * @return
         */
        public static String toJsonString(Object o) {
            return JSON.toJSONString(o, SerializerFeature.DisableCircularReferenceDetect);
        }
    
        /**
         * json转为map
         *
         * @param json
         * @return
         */
        public static Map<String, String> jsonToMap(String json) {
            return JSON.parseObject(json, new TypeReference<LinkedHashMap<String, String>>() {
            });
        }
    
        public static Map<String, Object> entityToMap(Object obj) throws IllegalAccessException {
            Map<String, Object> map = new LinkedHashMap<String, Object>();
            Class<?> clazz = obj.getClass();
            System.out.println(clazz);
            for (Field field : clazz.getDeclaredFields()) {
                field.setAccessible(true);
                String fieldName = field.getName();
                Object value = field.get(obj);
                if (value == null) {
                    value = "";
                }
                map.put(fieldName, value);
            }
            return map;
        }
    

      

     

  • 相关阅读:
    mysqllog
    清理:db上面的过期的binlog,释放磁盘空间。 (转)
    linux下shell命令trap
    mvc
    uci随笔
    luci 随笔
    shell脚本 整数比较
    lua学习
    OPENWRT make menuconfig错误之一
    openwrt 中make的使用
  • 原文地址:https://www.cnblogs.com/hejunhong/p/13929463.html
Copyright © 2011-2022 走看看