需求解决问题
当每次读取hive表或者其他数据源,获取数据,相对其进行rdd操作,遇到任何类都需要df.rdd(row>row.getstring(0))去获取,就很麻烦,所以可以实现个通用的转换方式
1.dataframe转为rdd通用方法
/** * df转为rdd 通用方法 * * @param frame * @return */ def dataFrameToRdd(frame: DataFrame): RDD[Array[Any]] = { val fields: Array[StructField] = frame.schema.toArray val zipData: Array[(StructField, Int)] = fields.zipWithIndex val rdd: RDD[Array[Any]] = frame.rdd.map(row => { val res: Array[Row => Any] = zipData.map(structField => squareRowkey2(structField._1, structField._2)) val array: Array[Any] = res.map(fun => { val value = fun(row) if (value == null) String.valueOf(value) else value }) array }) rdd }
对类型进行判断
/** * 根据schema信息进行判断与封装 * * @param dataType * @return */ //封装rowkey def squareRowkey2(dataType: (StructField, Int)): (Row) => Any = { val (structField, index) = dataType structField.dataType match { case StringType => (row: Row) => if (row.isNullAt(index)) null else row.getString(index) case LongType => (row: Row) =>if (row.isNullAt(index)) null else row.getLong(index) case FloatType => (row: Row) => if (row.isNullAt(index)) null else row.getFloat(index) case DoubleType => (row: Row) => if (row.isNullAt(index)) null else row.getDouble(index) case IntegerType => (row: Row) => if (row.isNullAt(index)) null else row.getInt(index) case BooleanType => (row: Row) => if (row.isNullAt(index)) null else row.getBoolean(index) case DateType => (row: Row) => if (row.isNullAt(index)) null else row.getDate(index) case TimestampType => (row: Row) => if (row.isNullAt(index)) null else row.getTimestamp(index) case BinaryType => (row: Row) => if (row.isNullAt(index)) null else row.getAs[Array[Byte]](index) case ArrayType(elementType, containsNull) => (row: Row) => { val value: mutable.WrappedArray[_ >: Integer with String <: io.Serializable with Comparable[_ >: Integer with String]] = elementType match { case IntegerType => { row.getAs[mutable.WrappedArray[Integer]](index) } case StringType => { row.getAs[mutable.WrappedArray[String]](index) } case _ => row.getAs[mutable.WrappedArray[String]](index) } //这儿必须转换为java的list 防止map转json字符串不符合要求 if (value == null) { util.Collections.emptyList() } JavaConversions.bufferAsJavaList(value.toBuffer) } case StructType(fields) => (row: Row) => row.getAs[mutable.Map[String, String]](index) case _ => (row: Row) => row.getString(index) } }
二、rdd转实体对象
大多数是都是讲数据分装为case calss或者对象
def dataFrameToEntity [U: ClassTag] (frame: DataFrame, clazz: Class[U], hiveRdd: RDD[Array[Any]]) = { val fields: Array[StructField] = frame.schema.toArray val rdd = hiveRdd.map(array => { val map = new util.HashMap[String, Any]() fields.map(_.name).zip(array) .foreach { case (k, v) => (map.put(k, v)) } val str = GsonUtil.toJsonString(map)
//这边转换工具类 就是gson的转为对象的方法 val value: U = GsonUtil.GsonToBean(str, clazz) value }) rdd }
使用:
lazy val df: DataFrame =spark.read.table("user") //将df转为rdd实体类 val userRdd: RDD[Array[Any]] = RddUtils.dataFrameToRdd(df) val userRDD2: RDD[User] = RddUtils.dataFrameToEntity(df, classOf[User], userRdd)
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import com.alibaba.fastjson.serializer.SerializerFeature; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.reflect.TypeToken; import org.apache.commons.lang3.StringUtils; import java.lang.reflect.Field; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; /** * @ClassName: com.xxx.sbc.dw.spark.submit.service.GsonUtil * @Description: json工具类 * @Author: imp * @Time: 2020/10/23 10:01 * @Version: 1.0 */ public class GsonUtil { private static Gson gson = null; //判断gson对象是否存在了,不存在则创建对象 static { if (gson == null) { //gson = new Gson(); //当使用GsonBuilder方式时属性为空的时候输出来的json字符串是有键值key的,显示形式是"key":null,而直接new出来的就没有"key":null的 gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create(); } } //无参的私有构造方法 private GsonUtil() { } /** * 将对象转成json格式 * * @param object * @return String */ public static String GsonString(Object object) { String gsonString = null; if (gson != null) { gsonString = gson.toJson(object); } return gsonString; } /** * 将json转成特定的cls的对象 * * @param gsonString * @param cls * @return */ public static <T> T GsonToBean(String gsonString, Class<T> cls) { T t = null; if (StringUtils.isNotEmpty(gsonString)) { //传入json对象和对象类型,将json转成对象 t = JSONObject.parseObject(gsonString, cls); } return t; } /** * json字符串转成list * * @param gsonString * @param cls * @return */ public static <T> List<T> GsonToList(String gsonString, Class<T> cls) { List<T> list = null; if (gson != null) { //根据泛型返回解析指定的类型,TypeToken<List<T>>{}.getType()获取返回类型 list = gson.fromJson(gsonString, new TypeToken<List<T>>() { }.getType()); } return list; } /** * json字符串转成list中有map的 * * @param gsonString * @return */ public static <T> List<Map<String, T>> GsonToListMaps(String gsonString) { List<Map<String, T>> list = null; if (gson != null) { list = gson.fromJson(gsonString, new TypeToken<List<Map<String, T>>>() { }.getType()); } return list; } public static <T> List<Map<String, T>> gsonToListMaps(String str) { List<Map<String, T>> list = null; if (gson != null) { list = gson.fromJson(str, new TypeToken<List<Map<String, T>>>() { }.getType()); } return list; } /** * json字符串转成map的 * * @param gsonString * @return */ public static <T> Map<String, String> GsonToMaps(String gsonString) { Map<String, String> map = null; if (gson != null) { map = gson.fromJson(gsonString, new TypeToken<Map<String, String>>() { }.getType()); } return map; } /** * 判断是否是json * * @param object * @return */ public static Boolean isJson(Object object) { try { gson.toJson(object); return true; } catch (Exception e) { System.err.format("{} is not json", object.toString()); return false; } } /** * 对象转为json字符串 * * @param o * @return */ public static String toJsonString(Object o) { return JSON.toJSONString(o, SerializerFeature.DisableCircularReferenceDetect); } /** * json转为map * * @param json * @return */ public static Map<String, String> jsonToMap(String json) { return JSON.parseObject(json, new TypeReference<LinkedHashMap<String, String>>() { }); } public static Map<String, Object> entityToMap(Object obj) throws IllegalAccessException { Map<String, Object> map = new LinkedHashMap<String, Object>(); Class<?> clazz = obj.getClass(); System.out.println(clazz); for (Field field : clazz.getDeclaredFields()) { field.setAccessible(true); String fieldName = field.getName(); Object value = field.get(obj); if (value == null) { value = ""; } map.put(fieldName, value); } return map; }