zoukankan      html  css  js  c++  java
  • Flink实战(108):connector(十七)hdfs 读写(二)写

    1. 依赖HDFS

    pom.xml 添加依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.example</groupId>
        <artifactId>FlinkHdfs</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <flink.version>1.11.0</flink.version>
            <scala.binary.version>2.11</scala.binary.version>
            <log4j.version>2.12.1</log4j.version>
            <hive.version>3.1.2</hive.version>
            <hadoop.version>3.1.3</hadoop.version>
        </properties>
        <dependencies>
            <!-- 运行FLINK必须-->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.56</version>
            </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
    
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <!-- 读HDFS必须-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-hadoop-compatibility_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
    
            <!-- 写HDFS必须-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
                <version>${flink.version}</version>
                <exclusions>
                    <exclusion>
                        <artifactId>slf4j-api</artifactId>
                        <groupId>org.slf4j</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-filesystem_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
    
    
    
        </dependencies>
    </project>
    View Code

    2. 配置 HDFS

    hdfs-site.xmlcore-site.xml放入到src/main/resources目录下面

    3.写入HDFS

    输入参数:

    {"deviceType":"0","userNums":0,"newusers":1,"dayActivenums":1,"timeinfoString":"","timeinfo":"2018090704","userId":"1","monthActivenums":1,"weekActivenums":1,"groupByField":"1==0==2018090704","times":1,"hourActivenums":1}

    {"deviceType":"1","userNums":0,"newusers":0,"dayActivenums":0,"timeinfoString":"","timeinfo":"2018090705","userId":"2","monthActivenums":0,"weekActivenums":0,"groupByField":"2==1==2018090705","times":1,"hourActivenums":0}

    1.主程序

    package com.atguigu
    
    import java.util.Properties
    
    import org.apache.flink.api.common.serialization.SimpleStringEncoder
    import org.apache.flink.core.fs.Path
    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema
    
    object WriteToHDFS {
      def main(args: Array[String]): Unit = {
        val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
        bsEnv.setParallelism(1)
    
         bsEnv.enableCheckpointing(5000L)//一定要开启checkpoint
    
        val properties = new Properties()
        properties.setProperty("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092")
        properties.setProperty("group.id", "caimoutest3");
    
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        properties.setProperty("auto.offset.reset", "latest")
    
        val stream = bsEnv.addSource(new FlinkKafkaConsumer011(
          "datainfo4", new SimpleStringSchema(), properties
        ))
    
        val fileSink = StreamingFileSink
          .forRowFormat(new Path("hdfs://hadoop102:9820/dataanlay/liuliang/"),new SimpleStringEncoder[String]("UTF-8"))
          .withBucketAssigner(new LiuLiangUserDetailBucketAssigner()) // 自定义分区路径
          .withBucketCheckInterval(5*1000)
          .build()
    
        stream.addSink(fileSink)
    
        bsEnv.execute("LiuLiangHourUserDetailAnaly")
    
    
    
    
    
    
      }
    
    }

    2 LiuLiangUserDetailBucketAssigner

    package com.atguigu
    
    import java.io.File
    
    import com.alibaba.fastjson.JSON
    import org.apache.flink.core.io.SimpleVersionedSerializer
    import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner
    
    class LiuLiangUserDetailBucketAssigner extends BucketAssigner[String,String] {
      override def getBucketId(in: String, context: BucketAssigner.Context): String = {
        System.out.println(in)
        val dateString = JSON.parseObject(in).getString("timeinfo")
        val result = dateString.substring(0, 8) + "/" + dateString.substring(8, 10)
        System.out.println(result)
        result
    
    
    
    
    
    
      }
    
      override def getSerializer: SimpleVersionedSerializer[String] = new LiuLiangStringSerializer
    
      def main(args: Array[String]): Unit = {
        val dateString = "2018090707"
        val result = dateString.substring(0, 8) + File.separator + dateString.substring(8, 10)
        System.out.println(result)
    
      }
    }

    3 LiuLiangStringSerializer

    package com.atguigu
    
    import org.apache.flink.core.io.SimpleVersionedSerializer
    
    class LiuLiangStringSerializer extends SimpleVersionedSerializer[String]{
      override def getVersion: Int = 0
    
      override def serialize(e: String): Array[Byte] = e.getBytes()
    
      override def deserialize(i: Int, bytes: Array[Byte]): String = {
        if (i != 77){
          throw new Exception("version mismatch")
        }else{
          new String(bytes)
        }
      }
    
    }

    TIP

    1. 请关闭HDFS 权限,不关闭需要把认证copy到resources目录下
    <property>
            <name>dfs.permissions</name>
            <value>false</value>
        </property>

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/14161978.html

  • 相关阅读:
    如何将网站升级为HTTPS协议?
    hashmap:cr:csdn
    HashMap的底层原理 cr:csdn:zhangshixi
    servlet
    泛型,反射
    线程

    集合
    java基础题
    我的博客网址
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/14161978.html
Copyright © 2011-2022 走看看