zoukankan      html  css  js  c++  java
  • Flink实战(八十一):FLINK-SQL使用基础(八)Flink 与 hive 结合使用(二)打包运行

    注意 1. Flink使用1.11.0版本、HIVE使用3.1.2版本、Hadoop使用3.1.3版本

    注意 2. 将hive-site.xml文件放在maven项目的resource目录下。

    注意 3. 不编写脚本的话要执行 export HADOOP_CLASSPATH=`hadoop classpath` 语句

    第一步:pom依赖

        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <flink.version>1.11.0</flink.version>
            <scala.binary.version>2.11</scala.binary.version>
            <log4j.version>2.12.1</log4j.version>
            <hive.version>3.1.2</hive.version>
            <hadoop.version>3.1.3</hadoop.version>
        </properties>
    <dependencies>
        <!-- Flink Dependency -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
            <!--            <scope>provided</scope>-->
        </dependency>
    
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_2.11</artifactId>
            <version>1.11.0</version>
            <scope>provided</scope>
        </dependency>
    
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>1.11.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>1.11.0</version>
            <scope>provided</scope>
        </dependency>
        <!-- Hive Dependency -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>3.1.2</version>
            <scope>provided</scope>
        </dependency>
    
        </dependencies>
    第二步:编写代码如下
    事先利用flink-sql-client 建立好表格
    SET table.sql-dialect=hive;
    CREATE TABLE hive_table_2 (
      log_info STRING
    ) PARTITIONED BY (
      dt STRING,
      hr STRING
    ) STORED AS PARQUET 
    LOCATION 'hdfs://localhost:9820/warehouse/gmall/test99'
    TBLPROPERTIES (
      'sink.partition-commit.trigger'='partition-time',
      'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
      'sink.partition-commit.delay'='1 h',
      'sink.rolling-policy.check-interval'='30s',
      'sink.rolling-policy.rollover-interval'='1min',
      'sink.partition-commit.policy.kind'='metastore,success-file'
    )
    SET table.sql-dialect=default;
     CREATE TABLE kafka_table (
             log_info STRING,
             log_ts TIMESTAMP(3),
             WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
            )WITH (
                'connector' = 'kafka',
                'topic' = 'ods_event_test',
                'properties.bootstrap.servers' = 'http://localhost:9092',
                'properties.group.id' = 'flink_hive_test',
                'scan.startup.mode' = 'latest-offset',
                'format' = 'json',
                'json.fail-on-missing-field' = 'false',
                'json.ignore-parse-errors' = 'true'
              )


    实际代码如下
    package com.atguigu.flink.hive
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.api._
    import org.apache.flink.table.api.bridge.scala._
    import org.apache.flink.table.catalog.hive.HiveCatalog
    
    object InsertData {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        env.enableCheckpointing(10000)
    
        val settings = EnvironmentSettings
          .newInstance()
          .useBlinkPlanner()
          .inStreamingMode()
          .build()
    
    
    
        val tEnv = StreamTableEnvironment.create(env,settings)
        val name = "myhive"
        val defaultDatabase = "gmall"
        val hiveConfDir = "/opt/module/hive/conf" // a local path
        val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir)
        tEnv.registerCatalog("myhive", hive)
        tEnv.useCatalog("myhive")
    
        tEnv.executeSql("INSERT INTO hive_table_2 SELECT log_info, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table")
    
    
      }
    
    }
    第三步:打包提交到服务器

        <build>
            <plugins>
                <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
                <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>3.2.2</version>
                    <executions>
                        <!-- Run shade goal on package phase -->
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <artifactSet>
                                    <excludes>
                                        <exclude>org.apache.flink:force-shading</exclude>
                                        <exclude>com.google.code.findbugs:jsr305</exclude>
                                        <exclude>org.slf4j:*</exclude>
                                        <exclude>log4j:*</exclude>
                                    </excludes>
                                </artifactSet>
                                <filters>
                                    <filter>
                                        <!-- Do not copy the signatures in the META-INF folder.
                                        Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                                <transformers>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass>com.atguigu.flink.hive.InsertData</mainClass>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
    
                <!-- Java Compiler -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.1</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
    
                <!-- Scala Compiler -->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                    <configuration>
                        <args>
                            <arg>-nobootcp</arg>
                        </args>
                        <addScalacArgs>-target:jvm-1.8</addScalacArgs>
                    </configuration>
                </plugin>
    
                <!-- Eclipse Scala Integration -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-eclipse-plugin</artifactId>
                    <version>2.8</version>
                    <configuration>
                        <downloadSources>true</downloadSources>
                        <projectnatures>
                            <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
                            <projectnature>org.eclipse.jdt.core.javanature</projectnature>
                        </projectnatures>
                        <buildcommands>
                            <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
                        </buildcommands>
                        <classpathContainers>
                            <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
                            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
                        </classpathContainers>
                        <excludes>
                            <exclude>org.scala-lang:scala-library</exclude>
                            <exclude>org.scala-lang:scala-compiler</exclude>
                        </excludes>
                        <sourceIncludes>
                            <sourceInclude>**/*.scala</sourceInclude>
                            <sourceInclude>**/*.java</sourceInclude>
                        </sourceIncludes>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.codehaus.mojo</groupId>
                    <artifactId>build-helper-maven-plugin</artifactId>
                    <version>1.7</version>
                    <executions>
                        <!-- Add src/main/scala to eclipse build path -->
                        <execution>
                            <id>add-source</id>
                            <phase>generate-sources</phase>
                            <goals>
                                <goal>add-source</goal>
                            </goals>
                            <configuration>
                                <sources>
                                    <source>src/main/scala</source>
                                </sources>
                            </configuration>
                        </execution>
                        <!-- Add src/test/scala to eclipse build path -->
                        <execution>
                            <id>add-test-source</id>
                            <phase>generate-test-sources</phase>
                            <goals>
                                <goal>add-test-source</goal>
                            </goals>
                            <configuration>
                                <sources>
                                    <source>src/test/scala</source>
                                </sources>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    第四步 提交
    利用shell提交将包上传到hadoop102机器上的/opt/module/flink/examples 文件夹下
    atguigu@hadoop102:/opt/module/flink$ bin/flink run -c com.atguigu.flink.hive.InsertData examples/TableFlink1113-1.0-SNAPSHOT.jar
    第五步 遇到第一个错误

    java.lang.NoClassDefFoundError: org/apache/flink/table/catalog/hive/HiveCatalog
             at com.tal.flink.hive.StreamMain.main(StreamMain.java:50)
             at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
             at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
             at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
             at java.lang.reflect.Method.invoke(Method.java:498)
             at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
             at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
             at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
             at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
             at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
             at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
             at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
             at java.security.AccessController.doPrivileged(Native Method)
             at javax.security.auth.Subject.doAs(Subject.java:422)
             at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
             at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
             at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
    Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.catalog.hive.HiveCatalog
             at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
             at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
             at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
             at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
             at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
             at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
             ... 17 more

    
    
    第六步 下载驱动包到 Flink的lib目录 解决第一个错误

    cd /export/servers/nc/flink/lib
    下载flink-sql-connector-hive包到flink的lib文件夹下

    第七步 再次提交作业-任务提交成功

    
    

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13986285.html

  • 相关阅读:
    2020年. NET Core面试题
    java Context namespace element 'component-scan' and its parser class ComponentScanBeanDefinitionParser are only available on JDK 1.5 and higher 解决方法
    vue 淡入淡出组件
    java http的get、post、post json参数的方法
    vue 父子组件通讯案例
    Vue 生产环境解决跨域问题
    npm run ERR! code ELIFECYCLE
    Android Studio 生成apk 出现 :error_prone_annotations.jar (com.google.errorprone:error) 错误
    记忆解析者芜青【总集】
    LwIP应用开发笔记之十:LwIP带操作系统基本移植
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13986285.html
Copyright © 2011-2022 走看看