zoukankan      html  css  js  c++  java
  • Spark中如何生成Avro文件

    研究spark的目的之一就是要取代MR,目前我司MR的一个典型应用场景即为生成Avro文件,然后加载到HIVE表里,所以如何在Spark中生成Avro文件,就是必然之路了。

    我本人由于对java不熟,对hadoop的各个框架也知之寥寥,所以耗费了五六天的时间才搞明白怎么在spark中生成avro文件。其中困难有二,一是网上关于spark中生成avro的资料太少,无论官网还是各种论坛,要么没有讯息,要么就是三言两语,对于我这个菜鸟而言,真是要了老命;二是在spark生成avro的代码中,用到了avro框架和hadoop框架的东西,他们自己底层的引用,又有可能和spark的底层引用冲突,虽然最终解决了问题,但是对于问题的直接原因,还么有弄明白。

    对于Java的老手,对于hadoop的生态又比较熟悉的人,估计这个课题一天之内就解决了。这里我不怕大家笑话,将目前在本地能跑成功的代码贴出来,还多请指教。还没有提交到集群中去。

    1.代码片段

    2.pom文件

    3.avro格式和文本

    1.代码片段

    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.rdd._
    import org.apache.hadoop.mapreduce.Job
    import org.apache.hadoop.io.NullWritable
    import org.apache.avro.Schema
    import org.apache.avro.mapred.AvroKey
    import org.apache.avro.mapreduce.AvroKeyOutputFormat
    import org.apache.avro.mapreduce._
    
    object TextTest extends App {
      System.setProperty("hadoop.home.dir", "D:\bd\software\winutils")
      val sparkConf = new SparkConf().setAppName("AvroTest").setMaster("local[4]")
      val sc = new SparkContext(sparkConf)
      
      //**************************to generate an avro file based on internal java type
      var li = List("A","A","C","B")
      var lip = sc.parallelize(li, 4)
      var liprdd = lip.map { x => (new AvroKey[String](x),NullWritable.get()) }
      var prdd = new PairRDDFunctions(liprdd)
      val schema = Schema.create(Schema.Type.STRING)
      val job1 = Job.getInstance
      AvroJob.setOutputKeySchema(job1, schema)
      prdd.saveAsNewAPIHadoopFile("D:/002", classOf[AvroKey[String]], classOf[NullWritable],
          classOf[AvroKeyOutputFormat[String]], job1.getConfiguration)
      println("job1 done")
      
      
      //**************************to generate an avro file based on avro type
      var av = sc.textFile("D://bdp//NewHuman//Users.txt",5)
      var job = Job.getInstance
      AvroJob.setOutputKeySchema(job, User.getClassSchema)
      val avArray = av.map(x => x.split(" "))
      
      val userP = avArray.map { x => (new AvroKey[User](User.newBuilder().setFavoriteNumber(Integer.parseInt(x(2))).setName(x(0)).setFavoriteColor(x(1)).build()),NullWritable.get()) }
    
      var avP = new PairRDDFunctions(userP)
      
       avP.saveAsNewAPIHadoopFile("D:/003", classOf[AvroKey[User]], classOf[NullWritable],
          classOf[AvroKeyOutputFormat[User]], job.getConfiguration)
      
      println("job2 done")
    }
    

      代码中演示了两种类型的场景,一种是内存技术的,一种是外部文件。其中需要注意的是,必须要用AvroJob去设定schema,再者就是只有pairRDD才有saveAsNewAPIHadoop方法,所以其他的RDD必须要转成PairRDD。

      另外,上面代码中的User类是利用avro自动生成的,需要引用进来。

    2.pom文件

      1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      2     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      3     <modelVersion>4.0.0</modelVersion>
      4 
      5 
      6     <dependencies>
      7         
      8         <dependency>
      9             <groupId>jdk.tools</groupId>
     10             <artifactId>jdk.tools</artifactId>
     11             <version>1.7.0_67</version>
     12             <scope>system</scope>
     13             <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
     14         </dependency>
     15 
     16         <dependency>
     17         <groupId>com.google.guava</groupId>
     18         <artifactId>guava</artifactId>
     19         <version>16.0.1</version>
     20     </dependency>
     21         
     22 
     23         <dependency>
     24             <groupId>org.apache.spark</groupId>
     25             <artifactId>spark-core_2.10</artifactId>
     26             <version>1.2.2</version>
     27         </dependency>
     28 
     29         <dependency>
     30             <groupId>org.scala-lang</groupId>
     31             <artifactId>scala-library</artifactId>
     32             <version>2.10.4</version>
     33         </dependency>
     34         <dependency>
     35             <groupId>junit</groupId>
     36             <artifactId>junit</artifactId>
     37             <version>3.8.1</version>
     38             <scope>test</scope>
     39         </dependency>
     40 
     41         <dependency>
     42             <groupId>org.apache.kafka</groupId>
     43             <artifactId>kafka_2.10</artifactId>
     44             <version>0.8.1.1</version>
     45         </dependency>
     46 
     47         <dependency>
     48             <groupId>org.apache.spark</groupId>
     49             <artifactId>spark-streaming_2.10</artifactId>
     50             <version>1.2.2</version>
     51         </dependency>
     52         <dependency>
     53             <groupId>org.apache.spark</groupId>
     54             <artifactId>spark-streaming-kafka_2.10</artifactId>
     55             <version>1.2.2</version>
     56         </dependency>
     57 
     58         <dependency>
     59             <groupId>org.apache.avro</groupId>
     60             <artifactId>avro</artifactId>
     61             <version>1.7.4</version>
     62         </dependency>
     63 
     64         <dependency>
     65             <groupId>org.apache.avro</groupId>
     66             <artifactId>avro-mapred</artifactId>
     67             <version>1.7.4</version>
     68             <classifier>hadoop2</classifier>
     69         </dependency>
     70 
     71         <dependency>
     72             <groupId>org.apache.hadoop</groupId>
     73             <artifactId>hadoop-common</artifactId>
     74             <version>2.2.0</version>
     75         </dependency>
     76 
     77         <dependency>
     78             <groupId>org.apache.hadoop</groupId>
     79             <artifactId>hadoop-core</artifactId>
     80             <version>1.0.4</version>
     81         </dependency>
     82 
     83     </dependencies>
     84 
     85     <repositories>
     86         <repository>
     87             <id>scala-tools.org</id>
     88             <name>Scala-tools Maven2 Repository</name>
     89             <url>http://scala-tools.org/repo-releases</url>
     90         </repository>
     91     </repositories>
     92 
     93     <pluginRepositories>
     94         <pluginRepository>
     95             <id>scala-tools.org</id>
     96             <name>Scala-tools Maven2 Repository</name>
     97             <url>http://scala-tools.org/repo-releases</url>
     98         </pluginRepository>
     99     </pluginRepositories>
    100 
    101     <build>
    102         <sourceDirectory>src</sourceDirectory>
    103         <pluginManagement>
    104             <plugins>
    105                 <plugin>
    106                     <groupId>net.alchim31.maven</groupId>
    107                     <artifactId>maven-scala-plugin</artifactId>
    108                     <version>3.2.0</version>
    109                 </plugin>
    110                 <plugin>
    111                     <groupId>org.apache.maven.plugins</groupId>
    112                     <artifactId>maven-compiler-plugin</artifactId>
    113                     <version>3.1</version>
    114                 </plugin>
    115             </plugins>
    116         </pluginManagement>
    117         <plugins>
    118             <plugin>
    119                 <groupId>org.scala-tools</groupId>
    120                 <artifactId>maven-scala-plugin</artifactId>
    121                 <executions>
    122                     <execution>
    123                         <id>scala-compile-first</id>
    124                         <phase>process-resources</phase>
    125                         <goals>
    126                             <goal>add-source</goal>
    127                             <goal>compile</goal>
    128                         </goals>
    129                     </execution>
    130                 </executions>
    131             </plugin>
    132         </plugins>
    133     </build>
    134 </project>

    这个文件中要注意的是dependency的顺序不能变,由于spark和avro的底层的外部引用可能会冲突。

    3.avro格式和文本

    avro格式为

    {"namespace": "example.avro",
     "type": "record",
     "name": "User",
     "fields": [
         {"name": "name", "type": "string"},
         {"name": "favorite_number",  "type": ["int", "null"]},
         {"name": "favorite_color", "type": ["string", "null"]}
     ]
    }

    测试用例的文本Users.txt


    Josh Green 13
    Ken Yellow 6
    Xiao Orange 8
    Gerry Black 12

  • 相关阅读:
    配置多个视图解析器
    在500jsp错误页面获取错误信息
    移动端浏览器监听返回键
    eclipse添加js智能代码提示
    js验证银行卡号 luhn校验规则
    免装版tomcat注册成windows系统服务方法
    微信公众号支付,为什么你找了那么多关于微信支付博客案例依然跑不通......两步带你完成支付,看完还做不出来你找我
    jsp自定义标签
    MyBatis动态传入表名
    配置tomcat允许跨域访问,cors跨域资源共享
  • 原文地址:https://www.cnblogs.com/029zz010buct/p/4721093.html
Copyright © 2011-2022 走看看