代码调试过程中遇到的错误总结:
KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
在代码编写的过程中,IDEA并没有自动识别方法Subscribe需要导入的jar包,但是该方法的jar已经有maven下载,在这个过程中,需要去查看jar类中的所有方法,手动去导入实现。
在import org.apache.spark.streaming.kafka010.ConsumerStrategies 中找到需要使用的方法Subscribe,然后对方法进行引用
spark2.3+kafka0.11 direct模式
用代码实现对kafka数据的生产和消费
package com.bjsxt.scalaspark.streaming.streamingOnKafka import java.text.SimpleDateFormat import java.util.{Date, Properties} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import scala.util.Random /** * 向 kafka 中生产数据 */ object ProduceDataToKafka { def main(args: Array[String]): Unit = { val props = new Properties() props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String,String](props) var counter = 0 var keyFlag = 0 while(true){ counter +=1 keyFlag +=1 val content: String = userlogs() producer.send(new ProducerRecord[String, String]("mytopic", s"key-$keyFlag", content)) if(0 == counter%100){ counter = 0 Thread.sleep(5000) } } producer.close() } def userlogs()={ val userLogBuffer = new StringBuffer("") val timestamp = new Date().getTime(); var userID = 0L var pageID = 0L //随机生成的用户ID userID = Random.nextInt(2000) //随机生成的页面ID pageID = Random.nextInt(2000); //随机生成Channel val channelNames = Array[String]("Spark","Scala","Kafka","Flink","Hadoop","Storm","Hive","Impala","HBase","ML") val channel = channelNames(Random.nextInt(10)) val actionNames = Array[String]("View", "Register") //随机生成action行为 val action = actionNames(Random.nextInt(2)) val dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date()) userLogBuffer.append(dateToday) .append(" ") .append(timestamp) .append(" ") .append(userID) .append(" ") .append(pageID) .append(" ") .append(channel) .append(" ") .append(action) System.out.println(userLogBuffer.toString()) userLogBuffer.toString() } }
从kafka 消费数据,并实现手动设置offset
package com.bjsxt.scalaspark.streaming.streamingOnKafka import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.{Durations, StreamingContext} /** * SparkStreaming2.3版本 读取kafka 中数据 : * 1.采用了新的消费者api实现,类似于1.6中SparkStreaming 读取 kafka Direct模式。并行度 一样。 * 2.因为采用了新的消费者api实现,所有相对于1.6的Direct模式【simple api实现】 ,api使用上有很大差别。未来这种api有可能继续变化 * 3.kafka中有两个参数: * heartbeat.interval.ms:这个值代表 kafka集群与消费者之间的心跳间隔时间,kafka 集群确保消费者保持连接的心跳通信时间间隔。这个时间默认是3s. * 这个值必须设置的比session.timeout.ms 小,一般设置不大于 session.timeout.ms 的1/3 * session.timeout.ms : * 这个值代表消费者与kafka之间的session 会话超时时间,如果在这个时间内,kafka 没有接收到消费者的心跳【heartbeat.interval.ms 控制】, * 那么kafka将移除当前的消费者。这个时间默认是30s。 * 这个时间位于配置 group.min.session.timeout.ms【6s】 和 group.max.session.timeout.ms【300s】之间的一个参数,如果SparkSteaming 批次间隔时间大于5分钟, * 也就是大于300s,那么就要相应的调大group.max.session.timeout.ms 这个值。 * 4.大多数情况下,SparkStreaming读取数据使用 LocationStrategies.PreferConsistent 这种策略,这种策略会将分区均匀的分布在集群的Executor之间。 * 如果Executor在kafka 集群中的某些节点上,可以使用 LocationStrategies.PreferBrokers 这种策略,那么当前这个Executor 中的数据会来自当前broker节点。 * 如果节点之间的分区有明显的分布不均,可以使用 LocationStrategies.PreferFixed 这种策略,可以通过一个map 指定将topic分区分布在哪些节点中。 * * 5.新的消费者api 可以将kafka 中的消息预读取到缓存区中,默认大小为64k。默认缓存区在 Executor 中,加快处理数据速度。 * 可以通过参数 spark.streaming.kafka.consumer.cache.maxCapacity 来增大,也可以通过spark.streaming.kafka.consumer.cache.enabled 设置成false 关闭缓存机制。 * "注意:官网中描述这里建议关闭,在读取kafka时如果开启会有重复读取同一个topic partition 消息的问题,报错:KafkaConsumer is not safe for multi-threaded access" * * 6.关于消费者offset * 1).如果设置了checkpoint ,那么offset 将会存储在checkpoint中。 * 这种有缺点: 第一,当从checkpoint中恢复数据时,有可能造成重复的消费,需要我们写代码来保证数据的输出幂等。 * 第二,当代码逻辑改变时,无法从checkpoint中来恢复offset. * 2).依靠kafka 来存储消费者offset,kafka 中有一个特殊的topic 来存储消费者offset。新的消费者api中,会定期自动提交offset。这种情况有可能也不是我们想要的, * 因为有可能消费者自动提交了offset,但是后期SparkStreaming 没有将接收来的数据及时处理保存。这里也就是为什么会在配置中将enable.auto.commit 设置成false的原因。 * 这种消费模式也称最多消费一次,默认sparkStreaming 拉取到数据之后就可以更新offset,无论是否消费成功。自动提交offset的频率由参数auto.commit.interval.ms 决定,默认5s。 * *如果我们能保证完全处理完业务之后,可以后期异步的手动提交消费者offset. 注意:kafka集群重启后会清空所有消费者组保存的信息,这也是这种模式的弊端 * * 3).自己存储offset,这样在处理逻辑时,保证数据处理的事务,如果处理数据失败,就不保存offset,处理数据成功则保存offset.这样可以做到精准的处理一次处理数据。 * */ object SparkStreamingOnKafkaDirect { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local") conf.setAppName("SparkStreamingOnKafkaDirect") val ssc = new StreamingContext(conf,Durations.seconds(5)) //设置日志级别 // ssc.sparkContext.setLogLevel("ERROR") val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "node1:9092,node2:9092,node3:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "MyGroupId",// /** * * earliest :当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始 * latest:自动重置偏移量为最大偏移量【默认】* * none:没有找到以前的offset,抛出异常 */ "auto.offset.reset" -> "earliest", /** * 当设置 enable.auto.commit为false时,不会自动向kafka中保存消费者offset.需要异步的处理完数据之后手动提交 */ "enable.auto.commit" -> (false: java.lang.Boolean)//默认是true ) val topics = Array[String]("t0218") val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent,//消费策略 Subscribe[String, String](topics, kafkaParams) ) val transStrem: DStream[String] = stream.map(record => { val key_value = (record.key, record.value) println("receive message key = "+key_value._1) println("receive message value = "+key_value._2) key_value._2 }) val wordsDS: DStream[String] = transStrem.flatMap(line=>{line.split(" ")}) val result: DStream[(String, Int)] = wordsDS.map((_,1)).reduceByKey(_+_) result.print() /** * 以上业务处理完成之后,异步的提交消费者offset,这里将 enable.auto.commit 设置成false,就是使用kafka 自己来管理消费者offset * 注意这里,获取 offsetRanges: Array[OffsetRange] 每一批次topic 中的offset时,必须从 源头读取过来的 stream中获取,不能从经过stream转换之后的DStream中获取。 */ stream.foreachRDD { rdd => val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // some time later, after outputs have completed for(or <- offsetRanges){ println(s"current topic = ${or.topic},partition = ${or.partition},fromoffset = ${or.fromOffset},untiloffset=${or.untilOffset}") } stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } ssc.start() ssc.awaitTermination() ssc.stop() } }
代码实现流程:
1、启动zookeeper
2、启动各个节点的kafka
3、启动代码开始生成数据,并将生成的数据传输到消息队列
4、查看生成的数据: ./kafka-consumer-groups.sh --bootstrap-server mynode1:9092,mynode2:9092,mynode3:9092 --list (将生成的offset存储在自己在kafka中创建的组MyGroupID中)
5、查看数据组的结构: ./kafka-consumer-groups.sh --bootstrap-server mynode1:9092,mynode2:9092,mynode3:9092 --list
查看消息队列连接是否成功:
1、启动zookeeper
2、启动所有节点的kafkla
3、将启动完成以后的kafka
4、在bin目录下启动生产数据的脚本: ./kafka-console-producer.sh --broker-list mynode1:9092,mynode2:9092,mynode3:9092 --topic t
5、在bin目录下启动查看消费数据的脚本:./kafka-console-consumer.sh --zookeeper mynode1:2181,mynode2:2181,mynode3:2181 --topic topic0703
手动维护消费者offset:
将sparkStreaming和kafka以及redis整合之后,sparkstreaming先去kafka中读取数据,读取完之后,对数据进行处理,处理完之后,将offset存储到redis中
当将sparkstreaming停止之后,下一次重启的时候,先去redis中去查询,最新的offset ,查询过来之后设置给kafka
代码演示过程:
1、首先启动redis service redisd start
package com.bjsxt.scalaspark.streaming.streamingOnKafka import java.util import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.streaming.{Durations, StreamingContext} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import scala.collection.mutable /** * 利用redis 来维护消费者偏移量 */ object ManageOffsetUseRedis { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local") conf.setAppName("manageoffsetuseredis") //设置每个分区每秒读取多少条数据 conf.set("spark.streaming.kafka.maxRatePerPartition","10") val ssc = new StreamingContext(conf,Durations.seconds(5)) //设置日志级别 ssc.sparkContext.setLogLevel("Error") val topic = "mytopic" /** * 从Redis 中获取消费者offset */ val dbIndex = 3 val currentTopicOffset: mutable.Map[String, String] = getOffSetFromRedis(dbIndex,topic) //初始读取到的topic offset: currentTopicOffset.foreach(x=>{println(s" 初始读取到的offset: $x")}) //转换成需要的类型 val fromOffsets: Predef.Map[TopicPartition, Long] = currentTopicOffset.map { resultSet => new TopicPartition(topic, resultSet._1.toInt) -> resultSet._2.toLong }.toMap val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "node1:9092,node2:9092,node3:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "MyGroupId-1", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean)//默认是true ) /** * 将获取到的消费者offset 传递给SparkStreaming */ val stream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets) ) stream.foreachRDD { rdd => println("**** 业务处理完成 ****") val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreachPartition { iter => val o: OffsetRange = offsetRanges(TaskContext.get.partitionId) println(s"topic:${o.topic} partition:${o.partition} fromOffset:${o.fromOffset} untilOffset: ${o.untilOffset}") } //将当前批次最后的所有分区offsets 保存到 Redis中 saveOffsetToRedis(dbIndex,offsetRanges) } ssc.start() ssc.awaitTermination() ssc.stop() } /** * 将消费者offset 保存到 Redis中 * */ def saveOffsetToRedis(db:Int,offsetRanges:Array[OffsetRange]) = { val jedis = RedisClient.pool.getResource jedis.select(db) offsetRanges.foreach(one=>{ jedis.hset(one.topic, one.partition.toString,one.untilOffset.toString) }) RedisClient.pool.returnResource(jedis) } /** * 从Redis中获取保存的消费者offset * @param db * @param topic * @return */ def getOffSetFromRedis(db:Int,topic:String) ={ val jedis = RedisClient.pool.getResource jedis.select(db) val result: util.Map[String, String] = jedis.hgetAll(topic) RedisClient.pool.returnResource(jedis) if(result.size()==0){ result.put("0","0") result.put("1","0") result.put("2","0") } import scala.collection.JavaConversions.mapAsScalaMap val offsetMap: scala.collection.mutable.Map[String, String] = result offsetMap } }