import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* spark streaming 整合 kafka
*/
object KafkaDirectStream {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("KafkaDirectStream").setMaster("local[2]")
val streamingContext = new StreamingContext(sparkConf, Seconds(5)) //5秒一个批次
val kafkaParams = Map[String, Object](
/*
* 指定broker的地址清单,清单里不需要包含所有的broker地址,生产者会从给定的broker里查找其他broker的信息。
* 不过建议至少提供两个broker的信息作为容错。
*/
"bootstrap.servers" -> "test:9091,test:9092,test:9093",
/*键的序列化器*/
"key.deserializer" -> classOf[StringDeserializer],
/*值的序列化器*/
"value.deserializer" -> classOf[StringDeserializer],
/*消费者所在分组的ID*/
"group.id" -> "spark-streaming-group",
/*
* 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
* latest: 在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
* earliest: 在偏移量无效的情况下,消费者将从起始位置读取分区的记录
*/
"auto.offset.reset" -> "latest",
/*是否自动提交*/
"enable.auto.commit" -> (true: java.lang.Boolean)
)
/*可以同时订阅多个主题*/
val topics = Array("abc")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
/*位置策略*/
PreferConsistent,
/*订阅主题*/
Subscribe[String, String](topics, kafkaParams)
)
//stream.saveAsTextFiles("D:\wps\a") 写入本地文件
//stream.saveAsTextFiles("hdfs://192.168.0.118:9000/root/eight9/") 写入 hdfs
/*打印输入流*/
stream.map(record => (record.key, record.value)).print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
下面才是正规的 pom.xml 和 自己集群版本一致
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.produce</groupId>
<artifactId>produce</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.thoughtworks.paranamer</groupId>
<artifactId>paranamer</artifactId>
<version>2.8</version>
</dependency>
<dependency> <!-- Spark -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency> <!-- Spark Streaming -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
然后 kafka 部分 (版本为 scala 2.2.* ) kafka 2.2 (2.1 不支持指定broker)
创建生产者 bin/kafka-console-producer.sh --broker-list test:9091,test:9092,test:9093 --topic abc 查看topic list bin/kafka-topics.sh --list --bootstrap-server test:9091,test:9092,test:9093 创建 topic bin/kafka-console-producer.sh --broker-list test:9091,test:9092,test:9093 --topic abc
如果遇到程序代码 卡死在 kafka commit 的情况下,意味着 kafka找不到 broker地址,真他妈煞笔,需要将虚拟机 的 ip hostname 填写在
C:WindowsSystem32driversetc
上面
kafka就可以找到相应 broker的 topic
集群方式
代码 import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.{Seconds, StreamingContext} /** * spark streaming 整合 kafka */ object KafkaDirectStream { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("KafkaDirectStream").setMaster("spark://test:7077") val streamingContext = new StreamingContext(sparkConf, Seconds(5)) //5秒一个批次 val kafkaParams = Map[String, Object]( /* * 指定broker的地址清单,清单里不需要包含所有的broker地址,生产者会从给定的broker里查找其他broker的信息。 * 不过建议至少提供两个broker的信息作为容错。 */ "bootstrap.servers" -> "test:9091,test:9092,test:9093", /*键的序列化器*/ "key.deserializer" -> classOf[StringDeserializer], /*值的序列化器*/ "value.deserializer" -> classOf[StringDeserializer], /*消费者所在分组的ID*/ "group.id" -> "spark-streaming-group", /* * 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: * latest: 在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录) * earliest: 在偏移量无效的情况下,消费者将从起始位置读取分区的记录 */ "auto.offset.reset" -> "latest", /*是否自动提交*/ "enable.auto.commit" -> (true: java.lang.Boolean) ) /*可以同时订阅多个主题*/ val topics = Array("abc") val stream = KafkaUtils.createDirectStream[String, String]( streamingContext, /*位置策略*/ PreferConsistent, /*订阅主题*/ Subscribe[String, String](topics, kafkaParams) )
//stream.saveAsTextFiles("hdfs://192.168.0.118:9000/root/hello/") //写入 hdfs
/*打印输入流*/ stream.map(record => (record.key, record.value)).print() streamingContext.start() streamingContext.awaitTermination() } }
maven依赖 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.produce</groupId> <artifactId>produce</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>com.thoughtworks.paranamer</groupId> <artifactId>paranamer</artifactId> <version>2.8</version> </dependency> <dependency> <!-- Spark --> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.4.0</version> </dependency> <dependency> <!-- Spark Streaming --> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.4.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> </dependencies> </project>
集群提交方式 ./bin/spark-submit --class KafkaDirectStream --num-executors 4 --driver-memory 1G --executor-memory 1g --executor-cores 1 --conf spark.default.parallelism=1000 produce.jar
组件版本
JDK 8 scala 2.11.* kafka kafka_2.11-2.2.0 spark spark-2.4.0-bin-hadoop2.7 版本不对 会报 method not found (版本信息)