一、pom.xml
<!-- spark --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.3.4</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.4</version> </dependency> <!-- log4j 1.2.17 --> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <!-- https://mvnrepository.com/artifact/com.google.guava/guava --> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>14.0.1</version> </dependency>
因为需要打包scala的代码和maven胖包:
<build> <!-- 缩短jar包名字 --> <finalName>myspark</finalName> <plugins> <!-- scala --> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <!-- 胖包 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.5.5</version> <configuration> <archive> <manifest> <mainClass>com.njbdqn.MySpark</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
二、关于scala的配置
setting中java compiler-> 1.8 ;project structure -> 8; libraries 中添加scala-sdk
三、目录
其中日志文件:log4j.properties
#定义输出级别:只输出 DEBUG类型,输出到console和File
#log4j.rootLogger=debug,Console,File
log4j.rootLogger=WARN,File
#日志输出方式:控制台输出
#log4j.appender.Console=org.apache.log4j.ConsoleAppender
#log4j.appender.Console.Target=System.out
#可以灵活地指定布局模式
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm} -%p (%F:%L)- %m%n
### 配置输出到文件,并且每天都创建一个文件 ###
log4j.appender.File = org.apache.log4j.DailyRollingFileAppender
### windows下输出路径:全路径 d:/logs/myspark.log
### linux下输出路径(当前路径下的):logs/myspark.log
log4j.appender.File.File = d:/logs/myspark.log
#### DatePattern 确定创建文件的周期默认为每天 ####
log4j.appender.File.DatePattern='_' yyyy-MM-dd
log4j.appender.File.Append = true
log4j.appender.File.Threshold = DEBUG
log4j.appender.File.layout = org.apache.log4j.PatternLayout
log4j.appender.File.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
四、spark代码
(一)介绍
1)spark RDD
// 1, sparkConf val conf = new SparkConf().setMaster("local[2]").setAppName("myjob")// 本地2核 ;*表示所有的核 // 2, sparkcontext val sc = new SparkContext(conf) //创建SparkContext,该对象是提交spark App的入口 // 3, 使用sc创建RDD并执行相应的transformation和action sc.textFile("file:////root/sparkdata/wordcount.txt") // sc.textFile("file:////D:\idea\ideaProjects\spark_projects\myspark8\src\main\scala\com\njbdqn\wordcount.txt") .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).foreach(println(_)) // 4, 停止sc,结束该任务 sc.stop()
2)spark sql
val spark = SparkSession.builder().master("local[*]").appName("tb").getOrCreate() // 读取hdfs上的文件 val df = spark.read.format("csv").load("hdfs://192.168.56.111:9000/quizz/data/userbehaviour/UserBehavior.csv") df.show(5) spark.stop()
(二)读各种格式的文件
① 变成DataFrame
1)CSV
读csv的sc方法
val rdd1 = sc.textFile("hdfs://192.168.56.111:9000/party/data/locale.csv") val df1 = rdd1.map(x=>{ val strs = x.split(",") (strs(0),strs(1)) }).toDF("id","locale")
读csv的sparkssession方法
val spark = SparkSession.builder().master("local[*]").appName("tb").getOrCreate() // 读取hdfs上的文件 val df = spark.read.format("csv").option("header","true").load("hdfs://192.168.56.111:9000/quizz/data/userbehaviour/UserBehavior.csv") df.show(5) spark.stop()
2)TXT
【常用】读txt使用sc
val rdd1 = sc.textFile("hdfs://192.168.56.111:9000/party/data/locale.txt") val df1 = rdd1.map(x=>{ val strs = x.split(" ") (strs(0),strs(1)) }).toDF("id","locale")
读txt使用sparksession:
spark.read.option("sep"," ").csv("路径") .withColumnRenamed("_c0","name1") .withColumnRenamed("_c1","name2") .show
② 用rdd做
用sc读取csv文件,转成RDD[Array[String]],并去表头
val rddpre = sc.textFile("hdfs://192.168.56.111:9000/party/data/event_attendees.csv").map(x=>x.split(",",-1)) val head = rddpre.first() val rdd = rddpre.filter(x=>x!=head).cache
str.split(",",-1) 和 str.split(",") 的区别:前者把","之间的空值保留,后者去除空值
由于是csv文件,如果把1,2,3,,5变成1,2,3,5 则二维的对应列就乱了
五、打包
maven打包package后,把target下胖包(是with-dependences那个jar包)放到linux下java -jar 路径 执行
./spark-submit --class com.njbdqn.MySpark /root/myspark-jar-with-dependencies.jar