1、配置文件config.properties
# Kafka配置
kafka.broker.list = hadoop300:9092,hadoop301:9092,hadoop302:9092
# Redis配置
redis.host=hadoop300
redis.port=6379
2、读取Properties
package com.duoduo.realtime.utils
import java.io.InputStreamReader
import java.util.Properties
/**
* Author z
* Date 2020-08-27 10:04:21
*/
object PropertiesUtil {
def main(args: Array[String]): Unit = {
val properties: Properties = PropertiesUtil.load("config.properties")
println(properties.getProperty("kafka.broker.list"))
}
def load(propertiesName: String) = {
val p=new Properties()
p.load(new InputStreamReader(
Thread.currentThread().getContextClassLoader
.getResourceAsStream(propertiesName)
, "UTF-8"))
p
}
}
3、POM文件依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dw-stream</artifactId>
<groupId>com.duoduo</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dw-realtime</artifactId>
<properties>
<spark.version>2.4.0</spark.version>
<scala.version>2.11.8</scala.version>
<kafka.version>1.0.0</kafka.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.56</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>4.14.2-HBase-1.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>io.searchbox</groupId>
<artifactId>jest</artifactId>
<version>5.3.3</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
<version>2.7.8</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 该插件用于将Scala代码编译成class文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<!-- 声明绑定到maven的compile阶段 -->
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
4、工具类
1 package com.duoduo.realtime.utils
2
3 import java.util.Properties
4
5 import org.apache.kafka.clients.consumer.ConsumerRecord
6 import org.apache.kafka.common.TopicPartition
7 import org.apache.kafka.common.serialization.StringDeserializer
8 import org.apache.spark.streaming.StreamingContext
9 import org.apache.spark.streaming.dstream.InputDStream
10 import org.apache.spark.streaming.kafka010.{ConsumerStrategies, ConsumerStrategy, KafkaUtils, LocationStrategies}
11
12 /**
13 * Author z
14 * Date 2020-08-27 10:02:21
15 */
16 object KafkaUtil {
17 private val properties: Properties = PropertiesUtil.load("config.properties")
18 val broker_list = properties.getProperty("kafka.broker.list")
19 var kafkaParam = collection.mutable.Map(
20 "bootstrap.servers" -> broker_list, //用于初始化链接到集群的地址
21 "key.deserializer" -> classOf[StringDeserializer],
22 "value.deserializer" -> classOf[StringDeserializer],
23 //用于标识这个消费者属于哪个消费团体
24 "group.id" -> "gmall_consumer_group",
25 //如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
26 //可以使用这个配置,latest自动重置偏移量为最新的偏移量
27 "auto.offset.reset" -> "latest",
28 //如果是true,则这个消费者的偏移量会在后台自动提交,但是kafka宕机容易丢失数据
29 //如果是false,会需要手动维护kafka偏移量
30 "enable.auto.commit" -> (false: java.lang.Boolean)
31 )
32
33 def getKafkaStream(topic: String, ssc: StreamingContext)
34 : InputDStream[ConsumerRecord[String, String]] = {
35 val dStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils
36 .createDirectStream[String, String](
37 ssc,
38 LocationStrategies.PreferConsistent,
39 ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParam)
40 )
41 dStream
42 }
43
44 def getKafkaStream(topic: String, ssc: StreamingContext, groupid: String)
45 : InputDStream[ConsumerRecord[String, String]] = {
46 kafkaParam("group.id") = groupid
47 val dStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils
48 .createDirectStream[String, String](
49 ssc,
50 LocationStrategies.PreferConsistent,
51 ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParam)
52 )
53 dStream
54 }
55
56 def getKafkaStream(topic: String,
57 ssc: StreamingContext,
58 offsets: Map[TopicPartition, Long]
59 , groupid: String)
60 : InputDStream[ConsumerRecord[String, String]] = {
61 kafkaParam("group.id") = groupid
62 val dStream: ConsumerStrategy[String, String] = ConsumerStrategies
63 .Subscribe[String, String](Array(topic), kafkaParam, offsets)
64 KafkaUtils.createDirectStream[String, String](
65 ssc,
66 LocationStrategies.PreferConsistent,
67 dStream
68 )
69 }
70 }