1. log4j(具体见log4j文档)
log4j是一个java系统中用于输出日志信息的工具。log4j可以将日志定义成多种级别:ERROR / WARN / INFO / DEBUG
log4j通过获取到一个logger对象来输出日志:
val logger = Logger.getLogger("logger名称"); logger.info("日志内容")
所拿到的这些logger对象之间是有“父子”关系的,所有logger都是rootLogger的子!
"org.apache" 这个名字的logger是 "org"这个名字的logger的子!
log4j的日志输出格式和目的地,都是可以通过参数配置的;
-
目的地的控制用Appender输出组件
常用的Appender组件:
log4j.appender.xx=org.apache.log4j.ConsoleAppender
log4j.appender.rollingFile=org.apache.log4j.RollingFileAppender
-
格式的控制用LayOut布局组件
log4j.appender.xx.layout=org.apache.log4j.PatternLayout
log4j.appender.xx.layout.ConversionPattern=[%-5p] %d(%r) --> [%t] %l: %m %x %n
2. 父子maven工程
(1)创建一个父工程(如平常创建一样),父工程中不写代码,所以最好将src文件夹删除(比如公司新手会将代码误写入该文件夹)
(2)创建子工程
得到如下图
接着如下所示
到此,一个子maven项目dataware即建立成功,子项目的pom文件如下所示
若是子工程中的父工程配置删除后,子工程不认识父工程,但是父工程认识子工程
(3)说明
A. 父工程pom文件中引入公共的依赖和插件(会被子工程pom继承),此处有几处规范
- 依赖定义的管理(不是真正引入依赖) 标签:<dependencyManagement><dependencyManagement>
作用:父项目中某个子项目需要用到某个依赖,这个时候若是在子项目的pom文件中定义这个依赖的版本,当另外一个子项目也要这个依赖时,由于需要统一依赖的版本,这时另外一个子项目中也需要定义相同版本的依赖。这样就比较麻烦,这个时候就可以使用依赖定义的管理(在父工程中定义子项目需要依赖的版本,子项目中就不需要写依赖的版本),如下
父工程pom文件(部分)
<dependencyManagement> <dependency> <groupId>ch.hsr</groupId> <artifactId>geohash</artifactId> <version>1.3.0</version> </dependency> </dependencies> </dependencyManagement>
子工程pom文件
- 属性定义(标签:<properties><properties>)
- 依赖排除(标签:<exclusions><exclusions>): 解决jar包的版本冲突
比如下面的spark使用的hadoop版本就出现依赖的冲突
解决办法(排除依赖)
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> <exclusions> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> </exclusion> </exclusions> </dependency>
- 当在idea删除某个项目时,再创建一个同名的项目时,会出错(Idea中记录的东西会冲突)
解决办法:
直接到项目的目录中将idea的相关文件删除掉,如下图所示
spring子项目的创建
3.项目开发(埋点日志预处理-json数据解析、清洗过滤、数据集成实现、uid回补)
3.1 json数据格式如下:
3.2 需求说明
3.2.1 清洗过滤
此处为了记录数据方便,定义一个AppLogBean,该类中定义了两个方法(1.解析json 返回一个case class, 2. 判断一个bean是否有效),并在该类中定义一个case class AppLogBean
AppLogBean代码

package com._51doit.tian.dw.pre import com.alibaba.fastjson.{JSON, JSONObject} import org.apache.commons.lang3.StringUtils import scala.collection.mutable case class AppLogBean( eventid :String , timestamp :Double , event :Map[String,String] , uid :String , phoneNbr :String , sessionId :String , imei :String , mac :String , imsi :String , osName :String , osVer :String , androidId :String , resolution :String , deviceType :String , deviceId :String , uuid :String , appid :String , appVer :String , release_ch :String , promotion_ch :String , longtitude :Double , latitude :Double , carrier :String , netType :String , cid_sn :String , ip :String, var province:String = "", var city:String = "", var district:String = "", var dateStr:String = "", var timeStr:String = "" ) object AppLogBean { /** * 解析app埋点json日志,返回一个case class */ def parseJson2Bean(line:String): AppLogBean ={ try { val obj: JSONObject = JSON.parseObject(line) val eventid: String = obj.getString("eventid") val timestamp = obj.getString("timestamp").toDouble val event: JSONObject = obj.getJSONObject("event") val eventMap: mutable.HashMap[String, String] = new mutable.HashMap[String, String]() import scala.collection.JavaConversions._ for(ent <- event.entrySet()){ eventMap.put(ent.getKey,ent.getValue.toString) } val user = obj.getJSONObject("user") val uid = user.getString("uid") val phoneNbr = user.getString("phoneNbr") val sessionId = user.getString("sessionId") val phone = user.getJSONObject("phone") val imei = phone.getString("imei") val mac = phone.getString("mac") val imsi = phone.getString("imsi") val osName = phone.getString("osName") val osVer = phone.getString("osVer") val androidId = phone.getString("androidId") val resolution = phone.getString("resolution") val deviceType = phone.getString("deviceType") val deviceId = phone.getString("deviceId") val uuid = phone.getString("uuid") val app = user.getJSONObject("app") val appid = app.getString("appid") val appVer = app.getString("appVer") val release_ch = app.getString("release_ch") val promotion_ch = app.getString("promotion_ch") val loc = user.getJSONObject("loc") val longtitude = loc.getDouble("longtitude") val latitude = loc.getDouble("latitude") val carrier = loc.getString("carrier") val netType = loc.getString("netType") val cid_sn = loc.getString("cid_sn") val ip = loc.getString("ip") AppLogBean( eventid , timestamp, eventMap.toMap, uid , phoneNbr , sessionId , imei , mac , imsi , osName , osVer , androidId , resolution , deviceType , deviceId , uuid , appid , appVer , release_ch , promotion_ch , longtitude , latitude , carrier , netType , cid_sn , ip ) } catch { case e: Exception => null case _: Throwable => null } } /** * 判断一条bean是否有效 */ def isValidBean(bean:AppLogBean): Boolean ={ val uid: String = bean.uid val imei: String = bean.imei val uuid: String = bean.uuid val mac: String = bean.mac val androidId: String = bean.androidId val ip: String = bean.ip // 以上参数不能全为空 var flag1 = StringUtils.isNotBlank((uid + imei + uuid + mac + androidId + ip).replaceAll("null", "")) val event: Map[String, String] = bean.event val eventid: String = bean.eventid val sessionId = bean.sessionId var flag2 = (event != null) && (StringUtils.isNotBlank(eventid) ) && (StringUtils.isNotBlank(sessionId)) flag1 && flag2 } }
3.2.2 数据解析
此处event数据不用扁平化的原因是,event内的数据类型也不一样
3.2.3 数据集成
3.2.4 数据修正
思路图
注意:此处手机标识 比如imei为空时,作为join on相等的条件时会出错,一定要判断非空,由于sql语句很麻烦(如下),所以开发一个自定义函数,用来判断两个字符串在非空情况下是否相等
每一个手机识别方式都要这样写,很麻烦,以下是自定义的函数
// 开发一个自定义函数,用来判断两个字符串在非空情况下是否相等 val is_equal = (x: String, y: String) => { if (x != y || StringUtils.isBlank(x) || StringUtils.isBlank(y)) false else true } spark.udf.register("is_equal",is_equal)
业务代码
AppEventLogPreprocess

package com._51doit.tian.dw.pre import java.text.SimpleDateFormat import ch.hsr.geohash.GeoHash import com._51doit.tian.commons.utils.{DictLoadUtil, SparkUtils} import org.apache.commons.lang3.StringUtils import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} object AppEventLogPreprocess { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) val spark: SparkSession = SparkUtils.getSpark(this.getClass.getSimpleName) import spark.implicits._ // 加载原始日志文件 val ds: Dataset[String] = spark.read.textFile("E:\javafile\dataware\2019-10-29") // 解析json val beans: Dataset[AppLogBean] = ds.map(AppLogBean.parseJson2Bean) /** * 清洗过滤 */ val validBeans: Dataset[AppLogBean] = beans .filter(_ != null) .filter(AppLogBean.isValidBean(_)) /** * 数据集成 */ val dictDF: DataFrame = spark.read.parquet("E:/javafile/spark/out11") val dictMap: collection.Map[String, (String, String, String)] = DictLoadUtil.loadAreaDict(dictDF) val bc_dict = spark.sparkContext.broadcast(dictMap) // 然后进行集成 val integrated: Dataset[AppLogBean] = validBeans.mapPartitions(iter => { // 取广播变量 val dict: collection.Map[String, (String, String, String)] = bc_dict.value iter.map(bean => { // 处理GPS坐标 val longtitude: Double = bean.longtitude val latitude: Double = bean.latitude // 如果经纬度坐标在中国的经纬度范围之内,才去转geohash编码并从字典中查找省市区 if (longtitude > 0 && longtitude < 120 && latitude > 0 && latitude < 70) { val geo = GeoHash.withCharacterPrecision(latitude, longtitude, 5).toBase32 val area = dict.getOrElse(geo, ("", "", "")) bean.province = area._1 bean.city = area._2 bean.district = area._3 } // 处理时间戳 val sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val str: Array[String] = sdf.format(bean.timestamp).split(" ") bean.dateStr = str(0) bean.timeStr = str(1) // 返回集成完成的bean bean }) }) // integrated.where("trim(province) != '' ").show(10,false) /** * 数据修正 */ val haveUid = integrated.where("uid is not null and trim(uid) !='' ") val noUid = integrated.where(" uid is null or trim(uid) ='' ") import org.apache.spark.sql.functions._ val uids = haveUid .groupBy($"uid") .agg( max("imei").as("imei"), max("imsi").as("imsi"), max("mac").as("mac"), max("uuid").as("uuid"), max("androidId").as("androidId"), max("deviceId").as("deviceId") ) noUid.createTempView("nouid") uids.createTempView("uids") // 开发一个自定义函数,用来判断两个字符串在非空情况下是否相等 val is_equal = (x: String, y: String) => { if (x != y || StringUtils.isBlank(x) || StringUtils.isBlank(y)) false else true } spark.udf.register("is_equal",is_equal) // 对没有uid的数据进行回补操作 val part1: DataFrame = spark.sql( """ | |select | |a.eventid , |a.timestamp , |a.event , |if(b.uid is not null,b.uid,a.uid) as uid, |a.phoneNbr , |a.sessionId , |a.imei , |a.mac , |a.imsi , |a.osName , |a.osVer , |a.androidId , |a.resolution , |a.deviceType , |a.deviceId , |a.uuid , |a.appid , |a.appVer , |a.release_ch , |a.promotion_ch , |a.longtitude , |a.latitude , |a.carrier , |a.netType , |a.cid_sn , |a.ip , |a.province, |a.city, |a.district, |a.dateStr, |a.timeStr | |from | |nouid a left join uids b | on is_equal(a.imei,b.imei) | or is_equal(a.imsi,b.imsi) | or is_equal(a.mac,b.mac) | or is_equal(a.uuid,b.uuid) | or is_equal(a.androidId,b.androidId) | or is_equal(a.deviceId,b.deviceId) | """.stripMargin) // 将回补好的数据 union 原来就有uid的数据 val result = haveUid.toDF.union(part1) // 输出结果 result.write.parquet("E:\javafile\dataware1\2019-10-29") spark.close() } }