1. 依赖HDFS
pom.xml 添加依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>FlinkHdfs</artifactId> <version>1.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.11.0</flink.version> <scala.binary.version>2.11</scala.binary.version> <log4j.version>2.12.1</log4j.version> <hive.version>3.1.2</hive.version> <hadoop.version>3.1.3</hadoop.version> </properties> <dependencies> <!-- 运行FLINK必须--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.56</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- 读HDFS必须--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-compatibility_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!-- 写HDFS必须--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.11</artifactId> <version>${flink.version}</version> </dependency> </dependencies> </project>
2. 配置 HDFS
将hdfs-site.xml
和core-site.xml
放入到src/main/resources
目录下面
3.写入HDFS
输入参数:
{"deviceType":"0","userNums":0,"newusers":1,"dayActivenums":1,"timeinfoString":"","timeinfo":"2018090704","userId":"1","monthActivenums":1,"weekActivenums":1,"groupByField":"1==0==2018090704","times":1,"hourActivenums":1}
{"deviceType":"1","userNums":0,"newusers":0,"dayActivenums":0,"timeinfoString":"","timeinfo":"2018090705","userId":"2","monthActivenums":0,"weekActivenums":0,"groupByField":"2==1==2018090705","times":1,"hourActivenums":0}
1.主程序
package com.atguigu import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringEncoder import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 import org.apache.flink.streaming.util.serialization.SimpleStringSchema object WriteToHDFS { def main(args: Array[String]): Unit = { val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment bsEnv.setParallelism(1) bsEnv.enableCheckpointing(5000L)//一定要开启checkpoint val properties = new Properties() properties.setProperty("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092") properties.setProperty("group.id", "caimoutest3"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest") val stream = bsEnv.addSource(new FlinkKafkaConsumer011( "datainfo4", new SimpleStringSchema(), properties )) val fileSink = StreamingFileSink .forRowFormat(new Path("hdfs://hadoop102:9820/dataanlay/liuliang/"),new SimpleStringEncoder[String]("UTF-8")) .withBucketAssigner(new LiuLiangUserDetailBucketAssigner()) // 自定义分区路径 .withBucketCheckInterval(5*1000) .build() stream.addSink(fileSink) bsEnv.execute("LiuLiangHourUserDetailAnaly") } }
2 LiuLiangUserDetailBucketAssigner
package com.atguigu import java.io.File import com.alibaba.fastjson.JSON import org.apache.flink.core.io.SimpleVersionedSerializer import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner class LiuLiangUserDetailBucketAssigner extends BucketAssigner[String,String] { override def getBucketId(in: String, context: BucketAssigner.Context): String = { System.out.println(in) val dateString = JSON.parseObject(in).getString("timeinfo") val result = dateString.substring(0, 8) + "/" + dateString.substring(8, 10) System.out.println(result) result } override def getSerializer: SimpleVersionedSerializer[String] = new LiuLiangStringSerializer def main(args: Array[String]): Unit = { val dateString = "2018090707" val result = dateString.substring(0, 8) + File.separator + dateString.substring(8, 10) System.out.println(result) } }
3 LiuLiangStringSerializer
package com.atguigu import org.apache.flink.core.io.SimpleVersionedSerializer class LiuLiangStringSerializer extends SimpleVersionedSerializer[String]{ override def getVersion: Int = 0 override def serialize(e: String): Array[Byte] = e.getBytes() override def deserialize(i: Int, bytes: Array[Byte]): String = { if (i != 77){ throw new Exception("version mismatch") }else{ new String(bytes) } } }
TIP
- 请关闭HDFS 权限,不关闭需要把认证copy到resources目录下
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>