zoukankan      html  css  js  c++  java
  • Spark中如何生成Avro文件

    研究spark的目的之一就是要取代MR,目前我司MR的一个典型应用场景即为生成Avro文件,然后加载到HIVE表里,所以如何在Spark中生成Avro文件,就是必然之路了。

    我本人由于对java不熟,对hadoop的各个框架也知之寥寥,所以耗费了五六天的时间才搞明白怎么在spark中生成avro文件。其中困难有二,一是网上关于spark中生成avro的资料太少,无论官网还是各种论坛,要么没有讯息,要么就是三言两语,对于我这个菜鸟而言,真是要了老命;二是在spark生成avro的代码中,用到了avro框架和hadoop框架的东西,他们自己底层的引用,又有可能和spark的底层引用冲突,虽然最终解决了问题,但是对于问题的直接原因,还么有弄明白。

    对于Java的老手,对于hadoop的生态又比较熟悉的人,估计这个课题一天之内就解决了。这里我不怕大家笑话,将目前在本地能跑成功的代码贴出来,还多请指教。还没有提交到集群中去。

    1.代码片段

    2.pom文件

    3.avro格式和文本

    1.代码片段

    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.rdd._
    import org.apache.hadoop.mapreduce.Job
    import org.apache.hadoop.io.NullWritable
    import org.apache.avro.Schema
    import org.apache.avro.mapred.AvroKey
    import org.apache.avro.mapreduce.AvroKeyOutputFormat
    import org.apache.avro.mapreduce._
    
    object TextTest extends App {
      System.setProperty("hadoop.home.dir", "D:\bd\software\winutils")
      val sparkConf = new SparkConf().setAppName("AvroTest").setMaster("local[4]")
      val sc = new SparkContext(sparkConf)
      
      //**************************to generate an avro file based on internal java type
      var li = List("A","A","C","B")
      var lip = sc.parallelize(li, 4)
      var liprdd = lip.map { x => (new AvroKey[String](x),NullWritable.get()) }
      var prdd = new PairRDDFunctions(liprdd)
      val schema = Schema.create(Schema.Type.STRING)
      val job1 = Job.getInstance
      AvroJob.setOutputKeySchema(job1, schema)
      prdd.saveAsNewAPIHadoopFile("D:/002", classOf[AvroKey[String]], classOf[NullWritable],
          classOf[AvroKeyOutputFormat[String]], job1.getConfiguration)
      println("job1 done")
      
      
      //**************************to generate an avro file based on avro type
      var av = sc.textFile("D://bdp//NewHuman//Users.txt",5)
      var job = Job.getInstance
      AvroJob.setOutputKeySchema(job, User.getClassSchema)
      val avArray = av.map(x => x.split(" "))
      
      val userP = avArray.map { x => (new AvroKey[User](User.newBuilder().setFavoriteNumber(Integer.parseInt(x(2))).setName(x(0)).setFavoriteColor(x(1)).build()),NullWritable.get()) }
    
      var avP = new PairRDDFunctions(userP)
      
       avP.saveAsNewAPIHadoopFile("D:/003", classOf[AvroKey[User]], classOf[NullWritable],
          classOf[AvroKeyOutputFormat[User]], job.getConfiguration)
      
      println("job2 done")
    }
    

      代码中演示了两种类型的场景,一种是内存技术的,一种是外部文件。其中需要注意的是,必须要用AvroJob去设定schema,再者就是只有pairRDD才有saveAsNewAPIHadoop方法,所以其他的RDD必须要转成PairRDD。

      另外,上面代码中的User类是利用avro自动生成的,需要引用进来。

    2.pom文件

      1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      2     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      3     <modelVersion>4.0.0</modelVersion>
      4 
      5 
      6     <dependencies>
      7         
      8         <dependency>
      9             <groupId>jdk.tools</groupId>
     10             <artifactId>jdk.tools</artifactId>
     11             <version>1.7.0_67</version>
     12             <scope>system</scope>
     13             <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
     14         </dependency>
     15 
     16         <dependency>
     17         <groupId>com.google.guava</groupId>
     18         <artifactId>guava</artifactId>
     19         <version>16.0.1</version>
     20     </dependency>
     21         
     22 
     23         <dependency>
     24             <groupId>org.apache.spark</groupId>
     25             <artifactId>spark-core_2.10</artifactId>
     26             <version>1.2.2</version>
     27         </dependency>
     28 
     29         <dependency>
     30             <groupId>org.scala-lang</groupId>
     31             <artifactId>scala-library</artifactId>
     32             <version>2.10.4</version>
     33         </dependency>
     34         <dependency>
     35             <groupId>junit</groupId>
     36             <artifactId>junit</artifactId>
     37             <version>3.8.1</version>
     38             <scope>test</scope>
     39         </dependency>
     40 
     41         <dependency>
     42             <groupId>org.apache.kafka</groupId>
     43             <artifactId>kafka_2.10</artifactId>
     44             <version>0.8.1.1</version>
     45         </dependency>
     46 
     47         <dependency>
     48             <groupId>org.apache.spark</groupId>
     49             <artifactId>spark-streaming_2.10</artifactId>
     50             <version>1.2.2</version>
     51         </dependency>
     52         <dependency>
     53             <groupId>org.apache.spark</groupId>
     54             <artifactId>spark-streaming-kafka_2.10</artifactId>
     55             <version>1.2.2</version>
     56         </dependency>
     57 
     58         <dependency>
     59             <groupId>org.apache.avro</groupId>
     60             <artifactId>avro</artifactId>
     61             <version>1.7.4</version>
     62         </dependency>
     63 
     64         <dependency>
     65             <groupId>org.apache.avro</groupId>
     66             <artifactId>avro-mapred</artifactId>
     67             <version>1.7.4</version>
     68             <classifier>hadoop2</classifier>
     69         </dependency>
     70 
     71         <dependency>
     72             <groupId>org.apache.hadoop</groupId>
     73             <artifactId>hadoop-common</artifactId>
     74             <version>2.2.0</version>
     75         </dependency>
     76 
     77         <dependency>
     78             <groupId>org.apache.hadoop</groupId>
     79             <artifactId>hadoop-core</artifactId>
     80             <version>1.0.4</version>
     81         </dependency>
     82 
     83     </dependencies>
     84 
     85     <repositories>
     86         <repository>
     87             <id>scala-tools.org</id>
     88             <name>Scala-tools Maven2 Repository</name>
     89             <url>http://scala-tools.org/repo-releases</url>
     90         </repository>
     91     </repositories>
     92 
     93     <pluginRepositories>
     94         <pluginRepository>
     95             <id>scala-tools.org</id>
     96             <name>Scala-tools Maven2 Repository</name>
     97             <url>http://scala-tools.org/repo-releases</url>
     98         </pluginRepository>
     99     </pluginRepositories>
    100 
    101     <build>
    102         <sourceDirectory>src</sourceDirectory>
    103         <pluginManagement>
    104             <plugins>
    105                 <plugin>
    106                     <groupId>net.alchim31.maven</groupId>
    107                     <artifactId>maven-scala-plugin</artifactId>
    108                     <version>3.2.0</version>
    109                 </plugin>
    110                 <plugin>
    111                     <groupId>org.apache.maven.plugins</groupId>
    112                     <artifactId>maven-compiler-plugin</artifactId>
    113                     <version>3.1</version>
    114                 </plugin>
    115             </plugins>
    116         </pluginManagement>
    117         <plugins>
    118             <plugin>
    119                 <groupId>org.scala-tools</groupId>
    120                 <artifactId>maven-scala-plugin</artifactId>
    121                 <executions>
    122                     <execution>
    123                         <id>scala-compile-first</id>
    124                         <phase>process-resources</phase>
    125                         <goals>
    126                             <goal>add-source</goal>
    127                             <goal>compile</goal>
    128                         </goals>
    129                     </execution>
    130                 </executions>
    131             </plugin>
    132         </plugins>
    133     </build>
    134 </project>

    这个文件中要注意的是dependency的顺序不能变,由于spark和avro的底层的外部引用可能会冲突。

    3.avro格式和文本

    avro格式为

    {"namespace": "example.avro",
     "type": "record",
     "name": "User",
     "fields": [
         {"name": "name", "type": "string"},
         {"name": "favorite_number",  "type": ["int", "null"]},
         {"name": "favorite_color", "type": ["string", "null"]}
     ]
    }

    测试用例的文本Users.txt


    Josh Green 13
    Ken Yellow 6
    Xiao Orange 8
    Gerry Black 12

  • 相关阅读:
    Encrypted Handshake Message
    RSAParameters Struct
    What if JWT is stolen?
    What's the difference between JWTs and Bearer Token?
    RSA Algorithm Example
    第18届Jolt大奖结果公布
    Ruby on rails开发从头来(windows)(三十六) 调试技巧
    Ruby on rails开发从头来(四十二) ActiveRecord基础(主键和ID)
    YouTube开放基础技术架构 让用户建自家YouTube
    Ruby on rails开发从头来(四十) ActiveRecord基础(Boolean属性)
  • 原文地址:https://www.cnblogs.com/029zz010buct/p/4721093.html
Copyright © 2011-2022 走看看