研究spark的目的之一就是要取代MR,目前我司MR的一个典型应用场景即为生成Avro文件,然后加载到HIVE表里,所以如何在Spark中生成Avro文件,就是必然之路了。
我本人由于对java不熟,对hadoop的各个框架也知之寥寥,所以耗费了五六天的时间才搞明白怎么在spark中生成avro文件。其中困难有二,一是网上关于spark中生成avro的资料太少,无论官网还是各种论坛,要么没有讯息,要么就是三言两语,对于我这个菜鸟而言,真是要了老命;二是在spark生成avro的代码中,用到了avro框架和hadoop框架的东西,他们自己底层的引用,又有可能和spark的底层引用冲突,虽然最终解决了问题,但是对于问题的直接原因,还么有弄明白。
对于Java的老手,对于hadoop的生态又比较熟悉的人,估计这个课题一天之内就解决了。这里我不怕大家笑话,将目前在本地能跑成功的代码贴出来,还多请指教。还没有提交到集群中去。
1.代码片段
2.pom文件
3.avro格式和文本
1.代码片段
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd._ import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.io.NullWritable import org.apache.avro.Schema import org.apache.avro.mapred.AvroKey import org.apache.avro.mapreduce.AvroKeyOutputFormat import org.apache.avro.mapreduce._ object TextTest extends App { System.setProperty("hadoop.home.dir", "D:\bd\software\winutils") val sparkConf = new SparkConf().setAppName("AvroTest").setMaster("local[4]") val sc = new SparkContext(sparkConf) //**************************to generate an avro file based on internal java type var li = List("A","A","C","B") var lip = sc.parallelize(li, 4) var liprdd = lip.map { x => (new AvroKey[String](x),NullWritable.get()) } var prdd = new PairRDDFunctions(liprdd) val schema = Schema.create(Schema.Type.STRING) val job1 = Job.getInstance AvroJob.setOutputKeySchema(job1, schema) prdd.saveAsNewAPIHadoopFile("D:/002", classOf[AvroKey[String]], classOf[NullWritable], classOf[AvroKeyOutputFormat[String]], job1.getConfiguration) println("job1 done") //**************************to generate an avro file based on avro type var av = sc.textFile("D://bdp//NewHuman//Users.txt",5) var job = Job.getInstance AvroJob.setOutputKeySchema(job, User.getClassSchema) val avArray = av.map(x => x.split(" ")) val userP = avArray.map { x => (new AvroKey[User](User.newBuilder().setFavoriteNumber(Integer.parseInt(x(2))).setName(x(0)).setFavoriteColor(x(1)).build()),NullWritable.get()) } var avP = new PairRDDFunctions(userP) avP.saveAsNewAPIHadoopFile("D:/003", classOf[AvroKey[User]], classOf[NullWritable], classOf[AvroKeyOutputFormat[User]], job.getConfiguration) println("job2 done") }
代码中演示了两种类型的场景,一种是内存技术的,一种是外部文件。其中需要注意的是,必须要用AvroJob去设定schema,再者就是只有pairRDD才有saveAsNewAPIHadoop方法,所以其他的RDD必须要转成PairRDD。
另外,上面代码中的User类是利用avro自动生成的,需要引用进来。
2.pom文件
1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 2 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 3 <modelVersion>4.0.0</modelVersion> 4 5 6 <dependencies> 7 8 <dependency> 9 <groupId>jdk.tools</groupId> 10 <artifactId>jdk.tools</artifactId> 11 <version>1.7.0_67</version> 12 <scope>system</scope> 13 <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> 14 </dependency> 15 16 <dependency> 17 <groupId>com.google.guava</groupId> 18 <artifactId>guava</artifactId> 19 <version>16.0.1</version> 20 </dependency> 21 22 23 <dependency> 24 <groupId>org.apache.spark</groupId> 25 <artifactId>spark-core_2.10</artifactId> 26 <version>1.2.2</version> 27 </dependency> 28 29 <dependency> 30 <groupId>org.scala-lang</groupId> 31 <artifactId>scala-library</artifactId> 32 <version>2.10.4</version> 33 </dependency> 34 <dependency> 35 <groupId>junit</groupId> 36 <artifactId>junit</artifactId> 37 <version>3.8.1</version> 38 <scope>test</scope> 39 </dependency> 40 41 <dependency> 42 <groupId>org.apache.kafka</groupId> 43 <artifactId>kafka_2.10</artifactId> 44 <version>0.8.1.1</version> 45 </dependency> 46 47 <dependency> 48 <groupId>org.apache.spark</groupId> 49 <artifactId>spark-streaming_2.10</artifactId> 50 <version>1.2.2</version> 51 </dependency> 52 <dependency> 53 <groupId>org.apache.spark</groupId> 54 <artifactId>spark-streaming-kafka_2.10</artifactId> 55 <version>1.2.2</version> 56 </dependency> 57 58 <dependency> 59 <groupId>org.apache.avro</groupId> 60 <artifactId>avro</artifactId> 61 <version>1.7.4</version> 62 </dependency> 63 64 <dependency> 65 <groupId>org.apache.avro</groupId> 66 <artifactId>avro-mapred</artifactId> 67 <version>1.7.4</version> 68 <classifier>hadoop2</classifier> 69 </dependency> 70 71 <dependency> 72 <groupId>org.apache.hadoop</groupId> 73 <artifactId>hadoop-common</artifactId> 74 <version>2.2.0</version> 75 </dependency> 76 77 <dependency> 78 <groupId>org.apache.hadoop</groupId> 79 <artifactId>hadoop-core</artifactId> 80 <version>1.0.4</version> 81 </dependency> 82 83 </dependencies> 84 85 <repositories> 86 <repository> 87 <id>scala-tools.org</id> 88 <name>Scala-tools Maven2 Repository</name> 89 <url>http://scala-tools.org/repo-releases</url> 90 </repository> 91 </repositories> 92 93 <pluginRepositories> 94 <pluginRepository> 95 <id>scala-tools.org</id> 96 <name>Scala-tools Maven2 Repository</name> 97 <url>http://scala-tools.org/repo-releases</url> 98 </pluginRepository> 99 </pluginRepositories> 100 101 <build> 102 <sourceDirectory>src</sourceDirectory> 103 <pluginManagement> 104 <plugins> 105 <plugin> 106 <groupId>net.alchim31.maven</groupId> 107 <artifactId>maven-scala-plugin</artifactId> 108 <version>3.2.0</version> 109 </plugin> 110 <plugin> 111 <groupId>org.apache.maven.plugins</groupId> 112 <artifactId>maven-compiler-plugin</artifactId> 113 <version>3.1</version> 114 </plugin> 115 </plugins> 116 </pluginManagement> 117 <plugins> 118 <plugin> 119 <groupId>org.scala-tools</groupId> 120 <artifactId>maven-scala-plugin</artifactId> 121 <executions> 122 <execution> 123 <id>scala-compile-first</id> 124 <phase>process-resources</phase> 125 <goals> 126 <goal>add-source</goal> 127 <goal>compile</goal> 128 </goals> 129 </execution> 130 </executions> 131 </plugin> 132 </plugins> 133 </build> 134 </project>
这个文件中要注意的是dependency的顺序不能变,由于spark和avro的底层的外部引用可能会冲突。
3.avro格式和文本
avro格式为
{"namespace": "example.avro", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
测试用例的文本Users.txt
Josh Green 13
Ken Yellow 6
Xiao Orange 8
Gerry Black 12