zoukankan      html  css  js  c++  java
  • Flink实战(八十一):FLINK-SQL使用基础(八)Flink 与 hive 结合使用(二)打包运行

    注意 1. Flink使用1.11.0版本、HIVE使用3.1.2版本、Hadoop使用3.1.3版本

    注意 2. 将hive-site.xml文件放在maven项目的resource目录下。

    注意 3. 不编写脚本的话要执行 export HADOOP_CLASSPATH=`hadoop classpath` 语句

    第一步:pom依赖

        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <flink.version>1.11.0</flink.version>
            <scala.binary.version>2.11</scala.binary.version>
            <log4j.version>2.12.1</log4j.version>
            <hive.version>3.1.2</hive.version>
            <hadoop.version>3.1.3</hadoop.version>
        </properties>
    <dependencies>
        <!-- Flink Dependency -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
            <!--            <scope>provided</scope>-->
        </dependency>
    
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_2.11</artifactId>
            <version>1.11.0</version>
            <scope>provided</scope>
        </dependency>
    
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>1.11.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>1.11.0</version>
            <scope>provided</scope>
        </dependency>
        <!-- Hive Dependency -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>3.1.2</version>
            <scope>provided</scope>
        </dependency>
    
        </dependencies>
    第二步:编写代码如下
    事先利用flink-sql-client 建立好表格
    SET table.sql-dialect=hive;
    CREATE TABLE hive_table_2 (
      log_info STRING
    ) PARTITIONED BY (
      dt STRING,
      hr STRING
    ) STORED AS PARQUET 
    LOCATION 'hdfs://localhost:9820/warehouse/gmall/test99'
    TBLPROPERTIES (
      'sink.partition-commit.trigger'='partition-time',
      'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
      'sink.partition-commit.delay'='1 h',
      'sink.rolling-policy.check-interval'='30s',
      'sink.rolling-policy.rollover-interval'='1min',
      'sink.partition-commit.policy.kind'='metastore,success-file'
    )
    SET table.sql-dialect=default;
     CREATE TABLE kafka_table (
             log_info STRING,
             log_ts TIMESTAMP(3),
             WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
            )WITH (
                'connector' = 'kafka',
                'topic' = 'ods_event_test',
                'properties.bootstrap.servers' = 'http://localhost:9092',
                'properties.group.id' = 'flink_hive_test',
                'scan.startup.mode' = 'latest-offset',
                'format' = 'json',
                'json.fail-on-missing-field' = 'false',
                'json.ignore-parse-errors' = 'true'
              )


    实际代码如下
    package com.atguigu.flink.hive
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.api._
    import org.apache.flink.table.api.bridge.scala._
    import org.apache.flink.table.catalog.hive.HiveCatalog
    
    object InsertData {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        env.enableCheckpointing(10000)
    
        val settings = EnvironmentSettings
          .newInstance()
          .useBlinkPlanner()
          .inStreamingMode()
          .build()
    
    
    
        val tEnv = StreamTableEnvironment.create(env,settings)
        val name = "myhive"
        val defaultDatabase = "gmall"
        val hiveConfDir = "/opt/module/hive/conf" // a local path
        val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir)
        tEnv.registerCatalog("myhive", hive)
        tEnv.useCatalog("myhive")
    
        tEnv.executeSql("INSERT INTO hive_table_2 SELECT log_info, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table")
    
    
      }
    
    }
    第三步:打包提交到服务器

        <build>
            <plugins>
                <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
                <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>3.2.2</version>
                    <executions>
                        <!-- Run shade goal on package phase -->
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <artifactSet>
                                    <excludes>
                                        <exclude>org.apache.flink:force-shading</exclude>
                                        <exclude>com.google.code.findbugs:jsr305</exclude>
                                        <exclude>org.slf4j:*</exclude>
                                        <exclude>log4j:*</exclude>
                                    </excludes>
                                </artifactSet>
                                <filters>
                                    <filter>
                                        <!-- Do not copy the signatures in the META-INF folder.
                                        Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                                <transformers>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass>com.atguigu.flink.hive.InsertData</mainClass>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
    
                <!-- Java Compiler -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.1</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
    
                <!-- Scala Compiler -->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                    <configuration>
                        <args>
                            <arg>-nobootcp</arg>
                        </args>
                        <addScalacArgs>-target:jvm-1.8</addScalacArgs>
                    </configuration>
                </plugin>
    
                <!-- Eclipse Scala Integration -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-eclipse-plugin</artifactId>
                    <version>2.8</version>
                    <configuration>
                        <downloadSources>true</downloadSources>
                        <projectnatures>
                            <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
                            <projectnature>org.eclipse.jdt.core.javanature</projectnature>
                        </projectnatures>
                        <buildcommands>
                            <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
                        </buildcommands>
                        <classpathContainers>
                            <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
                            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
                        </classpathContainers>
                        <excludes>
                            <exclude>org.scala-lang:scala-library</exclude>
                            <exclude>org.scala-lang:scala-compiler</exclude>
                        </excludes>
                        <sourceIncludes>
                            <sourceInclude>**/*.scala</sourceInclude>
                            <sourceInclude>**/*.java</sourceInclude>
                        </sourceIncludes>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.codehaus.mojo</groupId>
                    <artifactId>build-helper-maven-plugin</artifactId>
                    <version>1.7</version>
                    <executions>
                        <!-- Add src/main/scala to eclipse build path -->
                        <execution>
                            <id>add-source</id>
                            <phase>generate-sources</phase>
                            <goals>
                                <goal>add-source</goal>
                            </goals>
                            <configuration>
                                <sources>
                                    <source>src/main/scala</source>
                                </sources>
                            </configuration>
                        </execution>
                        <!-- Add src/test/scala to eclipse build path -->
                        <execution>
                            <id>add-test-source</id>
                            <phase>generate-test-sources</phase>
                            <goals>
                                <goal>add-test-source</goal>
                            </goals>
                            <configuration>
                                <sources>
                                    <source>src/test/scala</source>
                                </sources>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    第四步 提交
    利用shell提交将包上传到hadoop102机器上的/opt/module/flink/examples 文件夹下
    atguigu@hadoop102:/opt/module/flink$ bin/flink run -c com.atguigu.flink.hive.InsertData examples/TableFlink1113-1.0-SNAPSHOT.jar
    第五步 遇到第一个错误

    java.lang.NoClassDefFoundError: org/apache/flink/table/catalog/hive/HiveCatalog
             at com.tal.flink.hive.StreamMain.main(StreamMain.java:50)
             at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
             at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
             at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
             at java.lang.reflect.Method.invoke(Method.java:498)
             at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
             at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
             at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
             at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
             at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
             at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
             at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
             at java.security.AccessController.doPrivileged(Native Method)
             at javax.security.auth.Subject.doAs(Subject.java:422)
             at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
             at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
             at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
    Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.catalog.hive.HiveCatalog
             at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
             at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
             at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
             at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
             at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
             at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
             ... 17 more

    
    
    第六步 下载驱动包到 Flink的lib目录 解决第一个错误

    cd /export/servers/nc/flink/lib
    下载flink-sql-connector-hive包到flink的lib文件夹下

    第七步 再次提交作业-任务提交成功

    
    

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13986285.html

  • 相关阅读:
    解决:只有 DBA 才能导入由其他 DBA 导出的文件
    查找—顺序查找
    软件测试,想说爱你不容易
    Oracle常用SQL
    排序—直接插入排序
    排序—归并排序
    排序—快速排序
    排序—选择排序
    查找—折半查找
    排序—堆排序
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13986285.html
Copyright © 2011-2022 走看看