zoukankan      html  css  js  c++  java
  • Kafka+SparkStreaming+Zookeeper(ZK存储Offset,解决checkpoint问题)

    创建一个topic

    ./kafka-topics.sh --create --zookeeper 192.168.1.244:2181,192.168.1.245:2181,192.168.1.246:2181 --replication-factor 1
    --partitions 1 --topic topic_test_zk_minOffset_zkGroup

    查看topic列表

    ./kafka-topics.sh --list --zookeeper 192.168.1.244:2181,192.168.1.245:2181,192.168.1.246:2181

    producer 代码如下

    package com.kafka.test;
    
    import java.util.Properties;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    /**
    * @author:FengZhen
    * @create:2018年8月9日
    */
    public class Producer_zk {
    
    	public static void main(String[] args) {
    		Properties props = new Properties();
    		props.put("bootstrap.servers", "192.168.1.244:6667,192.168.1.247:6667");
    		//props.put("zookeeper.connect", "192.168.1.244:2181,192.168.1.245:2181,192.168.1.246:2181");
    	    props.put("acks", "all");
    	    props.put("retries", 0);
    	    props.put("batch.size", 16384);
    	    props.put("linger.ms", 1);
    	    props.put("buffer.memory", 33554432);
    	    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    	    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
    	    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
    	    for (int i = 30; i < 40; i++)
    	        producer.send(new ProducerRecord<String, String>("topic_test_zk_minOffset_zkGroup", Integer.toString(i), "中文测试-"+Integer.toString(i)));
    
    	    producer.close();
    	}
    	
    }

    Streaming代码如下

    package streaming
    
    import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo, TopicMetadataRequest}
    import kafka.common.TopicAndPartition
    import kafka.consumer.SimpleConsumer
    import kafka.message.MessageAndMetadata
    import kafka.serializer.StringDecoder
    import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
    import org.I0Itec.zkclient.ZkClient
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    object KafkaLog_local_zk_minOffset_zkGroup {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("KafkaLog_local_zk_minOffset_zkGroup").setMaster("local[2]")
        val sc = new SparkContext(conf)
        sc.setLogLevel("WARN")
        val ssc = new StreamingContext(sc, Seconds(5))
    
        val broker_servers = "192.168.1.244:6667,192.168.1.247:6667"
        val zk_host = "192.168.1.244:2181,192.168.1.245:2181,192.168.1.246:2181"
        //消费的 topic 名字
        val topic : String = "topic_test_zk_minOffset_zkGroup"
        //创建 stream 时使用的 topic 名字集合
        val topics : Set[String] = Set(topic)
    
        var kafkaParam:Map[String,String] = Map()
        kafkaParam += ("bootstrap.servers" -> broker_servers)
        kafkaParam += ("group.id" -> "test")
        kafkaParam += ("enable.auto.commit" -> "true")
        kafkaParam += ("auto.commit.interval.ms" -> "100")
    
        //创建一个 ZKGroupTopicDirs 对象,对保存
        val topicDirs = new ZKGroupTopicDirs("topic_test_zk_minOffset_zkGroup_group", topic)
    
        //获取 zookeeper 中的路径,这里会变成 /consumers/test_spark_streaming_group/offsets/topic_name
        // /consumers/topic_test_zk_minOffset_zkGroup_group/offsets/topic_test_zk_minOffset_zkGroup/0
        val zkTopicPath = s"${topicDirs.consumerOffsetDir}"
    
        //zookeeper 的host 和 ip,创建一个 client
        val zkClient = new ZkClient(zk_host)
        //查询该路径下是否字节点(默认有字节点为我们自己保存不同 partition 时生成的)
        val children = zkClient.countChildren(zkTopicPath)
    
        var kafkaStream : InputDStream[(String, String)] = null
    
        //如果 zookeeper 中有保存 offset,我们会利用这个 offset 作为 kafkaStream 的起始位置
        var fromOffsets: Map[TopicAndPartition, Long] = Map()
    
        //如果保存过 offset,这里更好的做法,还应该和  kafka 上最小的 offset 做对比,不然会报 OutOfRange 的错误
        if (children > 0) {
          for (i <- 0 until children) {
            val topic2 = List(topic)
            val req = new TopicMetadataRequest(topic2, 0)
            // 第一个参数是 kafka broker 的host,第二个是 port
            val getLeaderConsumer = new SimpleConsumer("192.168.1.244", 6667, 10000, 10000, "OffsetLookup")
            val res = getLeaderConsumer.send(req)
            val topicMetaOption = res.topicsMetadata.headOption
            val partitions = topicMetaOption match {
              // 将结果转化为 partition -> leader 的映射关系
              case Some(tm) =>
                tm.partitionsMetadata.map(pm => (pm.partitionId, pm.leader.get.host)).toMap[Int, String]
              case None =>
                Map[Int, String]()
            }
            //去出分片对应的leader host
            val brokerLeaderHost = partitions.get(i).toString.replace("Some(", "").replace(")","")
    
            val partitionOffset = zkClient.readData[String](s"${zkTopicPath}/${i}")
            val tp = TopicAndPartition(topic, i)
    
            val requestMin = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
            val consumerMin = new SimpleConsumer(brokerLeaderHost, 6667, 10000, 10000, "getMinOffset")
            val curOffsets = consumerMin.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tp).offsets
            var nextOffset = partitionOffset.toLong
            // 通过比较从 kafka 上该 partition 的最小 offset 和 zk 上保存的 offset,进行选择
            if (curOffsets.length > 0 && nextOffset < curOffsets.head) {
              nextOffset = curOffsets.head
            }
            //设置正确的 offset,这里将 nextOffset 设置为 0(0 只是一个特殊值),可以观察到 offset 过期的想想
            fromOffsets += (tp -> nextOffset)
            println("@@@@@@ topic[" + topic + "] partition[" + i + "] offset[" + partitionOffset + "] @@@@@@")
          }
    
          //这个会将 kafka 的消息进行 transform,最终 kafak 的数据都会变成 (topic_name, message) 这样的 tuple
          val messageHandler = (mmd : MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
          kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParam, fromOffsets, messageHandler)
        }
        else {
          //如果未保存,根据 kafkaParam 的配置使用最新或者最旧的 offset
          kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParam, topics)
        }
    
        var offsetRanges = Array[OffsetRange]()
        //得到该 rdd 对应 kafka 的消息的 offset
        kafkaStream.transform{ rdd =>
          offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          rdd
        }.foreachRDD { rdd => //.map(msg => Utils.msgDecode(msg))
          for (o <- offsetRanges) {
            val zkPath = s"${zkTopicPath}/${o.partition}"
            //将该 partition 的 offset 保存到 zookeeper
            ZkUtils.updatePersistentPath(zkClient, zkPath, o.fromOffset.toString)
            println(s"@@@@@@ topic  ${o.topic}  partition ${o.partition}  fromoffset ${o.fromOffset}  untiloffset ${o.untilOffset} #######")
          }
          rdd.foreachPartition(
            message => {
              while(message.hasNext) {
                println(s"@^_^@   [" + message.next() + "] @^_^@")
              }
            }
          )
        }
        //开启流式计算
        ssc.start()
        //一直会阻塞,等待退出
        ssc.awaitTermination()
      }
    }

    出现的问题

    使用simpleConsumer时报错

    Exception in thread "main" java.nio.channels.ClosedChannelException
    	at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
    	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
    	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
    	at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127)
    	at streaming.KafkaLog_local_zk_minOffset$$anonfun$main$1.apply$mcVI$sp(KafkaLog_local_zk_minOffset.scala:64)
    	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    	at streaming.KafkaLog_local_zk_minOffset$.main(KafkaLog_local_zk_minOffset.scala:44)
    	at streaming.KafkaLog_local_zk_minOffset.main(KafkaLog_local_zk_minOffset.scala)
    解决将Kafka config下的server.properties的参数修改下
    num.network.threads=3
    zookeeper.connection.timeout.ms=6000
    

     再次尝试即可.

  • 相关阅读:
    ASP.NET 配置log4net启用写错误日志功能
    jQuery formValidator表单验证插件
    jQuery Easy Validate Plugin
    https,https的本地测试环境搭建,asp.net结合https的代码实现,http网站转换成https网站之后遇到的问题
    Request 分别获取具有相同 name 属性表单元素值
    asp.net开源CMS推荐
    客户对账单本月欠款
    AR 客户本月回款
    应收帐款汇总
    采购订单关闭之PL/SQL实现方法
  • 原文地址:https://www.cnblogs.com/EnzoDin/p/9475553.html
Copyright © 2011-2022 走看看