zoukankan      html  css  js  c++  java
  • spark streaming从指定offset处消费Kafka数据

     spark streaming从指定offset处消费Kafka数据
    2017-06-13 15:19 770人阅读 评论(2) 收藏 举报
     分类: spark(5)  
    
    原文地址:http://blog.csdn.net/high2011/article/details/53706446
    
          首先很感谢原文作者,看到这篇文章我少走了很多弯路,转载此文章是为了保留一份供复习用,请大家支持原作者,移步到上面的连接去看,谢谢
    
    
    一、情景:当Spark streaming程序意外退出时,数据仍然再往Kafka中推送,然而由于Kafka默认是从latest的offset读取,这会导致数据丢失。为了避免数据丢失,那么我们需要记录每次消费的offset,以便下次检查并且从指定的offset开始读取
    二、环境:kafka-0.9.0、Spark-1.6.0、jdk-1.7、Scala-2.10.5、idea16
    三、实现代码:
          1、引入spark和kafka的相关依赖包
    [html] view plain copy
    <?xml version="1.0" encoding="UTF-8"?>  
    <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
             xmlns="http://maven.apache.org/POM/4.0.0"  
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
        <modelVersion>4.0.0</modelVersion>  
      
        <groupId>com.ngaa</groupId>  
        <artifactId>test-my</artifactId>  
        <version>1.0-SNAPSHOT</version>  
        <inceptionYear>2008</inceptionYear>  
        <properties>  
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>  
            <!--add  maven release-->  
            <maven.compiler.source>1.7</maven.compiler.source>  
            <maven.compiler.target>1.7</maven.compiler.target>  
            <encoding>UTF-8</encoding>  
            <!--scala版本-->  
            <scala.version>2.10.5</scala.version>  
            <!--测试机器上的scala版本-->  
            <test.scala.version>2.11.7</test.scala.version>  
      
            <jackson.version>2.3.0</jackson.version>  
            <!--slf4j版本-->  
            <slf4j-version>1.7.20</slf4j-version>  
            <!--cdh-spark-->  
            <spark.cdh.version>1.6.0-cdh5.8.0</spark.cdh.version>  
            <spark.streaming.cdh.version>1.6.0-cdh5.8.0</spark.streaming.cdh.version>  
            <kafka.spark.cdh.version>1.6.0-cdh5.8.0</kafka.spark.cdh.version>  
            <!--cdh-hadoop-->  
            <hadoop.cdh.version>2.6.0-cdh5.8.0</hadoop.cdh.version>  
            <!--http client必需要兼容CDH中的hadoop版本(cd /opt/cloudera/parcels/CDH/lib/hadoop/lib)-->  
            <httpclient.version>4.2.5</httpclient.version>  
      
            <!--http copre-->  
            <httpcore.version>4.2.5</httpcore.version>  
            <!--fastjson-->  
            <fastjson.version>1.1.39</fastjson.version>  
      
        </properties>  
      
        <repositories>  
            <repository>  
                <id>scala-tools.org</id>  
                <name>Scala-Tools Maven2 Repository</name>  
                <url>http://scala-tools.org/repo-releases</url>  
            </repository>  
            <!--配置依赖库地址(用于加载CDH依赖的jar包) -->  
            <repository>  
                <id>cloudera</id>  
                <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
            </repository>  
        </repositories>  
      
        <pluginRepositories>  
            <pluginRepository>  
                <id>scala-tools.org</id>  
                <name>Scala-Tools Maven2 Repository</name>  
                <url>http://scala-tools.org/repo-releases</url>  
            </pluginRepository>  
        </pluginRepositories>  
      
        <dependencies>  
      
            <!--fastjson-->  
            <dependency>  
                <groupId>com.alibaba</groupId>  
                <artifactId>fastjson</artifactId>  
                <version>${fastjson.version}</version>  
            </dependency>  
            <!--httpclient-->  
            <dependency>  
                <groupId>org.apache.httpcomponents</groupId>  
                <artifactId>httpclient</artifactId>  
                <version>${httpclient.version}</version>  
            </dependency>  
      
            <!--http core-->  
            <dependency>  
                <groupId>org.apache.httpcomponents</groupId>  
                <artifactId>httpcore</artifactId>  
                <version>${httpcore.version}</version>  
            </dependency>  
      
            <!--slf4j-->  
            <dependency>  
                <groupId>org.slf4j</groupId>  
                <artifactId>slf4j-log4j12</artifactId>  
                <version>${slf4j-version}</version>  
            </dependency>  
            <!--hadoop-->  
            <dependency>  
                <groupId>org.apache.hadoop</groupId>  
                <artifactId>hadoop-client</artifactId>  
                <version>${hadoop.cdh.version}</version>  
                <exclusions>  
                    <exclusion>  
                        <groupId>javax.servlet</groupId>  
                        <artifactId>*</artifactId>  
                    </exclusion>  
                </exclusions>  
            </dependency>  
            <dependency>  
                <groupId>org.apache.hadoop</groupId>  
                <artifactId>hadoop-common</artifactId>  
                <version>${hadoop.cdh.version}</version>  
                <exclusions>  
                    <exclusion>  
                        <groupId>javax.servlet</groupId>  
                        <artifactId>*</artifactId>  
                    </exclusion>  
                </exclusions>  
            </dependency>  
            <dependency>  
                <groupId>org.apache.hadoop</groupId>  
                <artifactId>hadoop-hdfs</artifactId>  
                <version>${hadoop.cdh.version}</version>  
                <exclusions>  
                    <exclusion>  
                        <groupId>javax.servlet</groupId>  
                        <artifactId>*</artifactId>  
                    </exclusion>  
                </exclusions>  
            </dependency>  
            <!--spark scala-->  
            <dependency>  
                <groupId>org.scala-lang</groupId>  
                <artifactId>scala-library</artifactId>  
                <version>${scala.version}</version>  
            </dependency>  
            <dependency>  
                <groupId>com.fasterxml.jackson.core</groupId>  
                <artifactId>jackson-databind</artifactId>  
                <version>${jackson.version}</version>  
            </dependency>  
      
            <!--spark streaming和kafka的相关包-->  
            <dependency>  
                <groupId>org.apache.spark</groupId>  
                <artifactId>spark-streaming_2.10</artifactId>  
                <version>${spark.streaming.cdh.version}</version>  
            </dependency>  
            <dependency>  
                <groupId>org.apache.spark</groupId>  
                <artifactId>spark-streaming-kafka_2.10</artifactId>  
                <version>${kafka.spark.cdh.version}</version>  
            </dependency>  
            <dependency>  
                <groupId>junit</groupId>  
                <artifactId>junit</artifactId>  
                <version>4.12</version>  
                <scope>test</scope>  
            </dependency>  
      
            <!--引入windows本地库的spark包-->  
            <dependency>  
            <groupId>org.apache.spark</groupId>  
            <artifactId>spark-assembly_2.10</artifactId>  
            <version>${spark.cdh.version}</version>  
            <scope>system</scope>  
            <systemPath>D:/crt_send_document/spark-assembly-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar</systemPath>  
            </dependency>  
      
            <!--引入测试环境linux本地库的spark包-->  
            <!--<dependency>-->  
                <!--<groupId>org.apache.spark</groupId>-->  
                <!--<artifactId>spark-assembly_2.10</artifactId>-->  
                <!--<version>${spark.cdh.version}</version>-->  
                <!--<scope>system</scope>-->  
                <!--<systemPath>/opt/cloudera/parcels/CDH/lib/spark/lib/spark-examples-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar-->  
                <!--</systemPath>-->  
            <!--</dependency>-->  
      
            <!--引入中央仓库的spark包-->  
            <!--<dependency>-->  
            <!--<groupId>org.apache.spark</groupId>-->  
            <!--<artifactId>spark-assembly_2.10</artifactId>-->  
            <!--<version>${spark.cdh.version}</version>-->  
            <!--</dependency>-->  
      
            <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-yarn-server-web-proxy -->  
            <dependency>  
                <groupId>org.apache.hadoop</groupId>  
                <artifactId>hadoop-yarn-server-web-proxy</artifactId>  
                <version>2.6.0-cdh5.8.0</version>  
            </dependency>  
      
        </dependencies>  
      
        <!--maven打包-->  
        <build>  
            <finalName>test-my</finalName>  
            <sourceDirectory>src/main/scala</sourceDirectory>  
            <testSourceDirectory>src/test/scala</testSourceDirectory>  
            <plugins>  
                <plugin>  
                    <groupId>org.scala-tools</groupId>  
                    <artifactId>maven-scala-plugin</artifactId>  
                    <version>2.15.2</version>  
                    <executions>  
                        <execution>  
                            <goals>  
                                <goal>compile</goal>  
                                <goal>testCompile</goal>  
                            </goals>  
                        </execution>  
                    </executions>  
                    <configuration>  
                        <scalaVersion>${scala.version}</scalaVersion>  
                        <args>  
                            <arg>-target:jvm-1.7</arg>  
                        </args>  
                    </configuration>  
                </plugin>  
                <plugin>  
                    <groupId>org.apache.maven.plugins</groupId>  
                    <artifactId>maven-eclipse-plugin</artifactId>  
                    <configuration>  
                        <downloadSources>true</downloadSources>  
                        <buildcommands>  
                            <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>  
                        </buildcommands>  
                        <additionalProjectnatures>  
                            <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>  
                        </additionalProjectnatures>  
                        <classpathContainers>  
                            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>  
                            <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>  
                        </classpathContainers>  
                    </configuration>  
                </plugin>  
                <plugin>  
                    <artifactId>maven-assembly-plugin</artifactId>  
                    <configuration>  
                        <descriptorRefs>  
                            <descriptorRef>jar-with-dependencies</descriptorRef>  
                        </descriptorRefs>  
                        <archive>  
                            <manifest>  
                                <mainClass></mainClass>  
                            </manifest>  
                        </archive>  
                    </configuration>  
                    <executions>  
                        <execution>  
                            <id>make-assembly</id>  
                            <phase>package</phase>  
                            <goals>  
                                <goal>single</goal>  
                            </goals>  
                        </execution>  
                    </executions>  
                </plugin>  
            </plugins>  
        </build>  
        <reporting>  
            <plugins>  
                <plugin>  
                    <groupId>org.scala-tools</groupId>  
                    <artifactId>maven-scala-plugin</artifactId>  
                    <configuration>  
                        <scalaVersion>${scala.version}</scalaVersion>  
                    </configuration>  
                </plugin>  
            </plugins>  
        </reporting>  
      
    </project>  
    
     2、新建测试类
    [java] view plain copy
    import kafka.common.TopicAndPartition  
    import kafka.message.MessageAndMetadata  
    import kafka.serializer.StringDecoder  
    import org.apache.log4j.{Level, Logger}  
    import org.apache.spark.{SparkConf, TaskContext}  
    import org.apache.spark.streaming.dstream.InputDStream  
    import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}  
    import org.apache.spark.streaming.{Seconds, StreamingContext}  
    import org.slf4j.LoggerFactory  
      
    /** 
      * Created by yangjf on 2016/12/18 
      * Update date: 
      * Time: 11:10 
      * Describle :从指定偏移量读取kafka数据 
      * Result of Test: 
      * Command: 
      * Email: jifei.yang@ngaa.com.cn 
      */  
    object ReadBySureOffsetTest {  
      val logger = LoggerFactory.getLogger(ReadBySureOffsetTest.getClass)  
      
      def main(args: Array[String]) {  
        //设置打印日志级别  
        Logger.getLogger("org.apache.kafka").setLevel(Level.ERROR)  
        Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR)  
        Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)  
        logger.info("测试从指定offset消费kafka的主程序开始")  
        if (args.length < 1) {  
          System.err.println("Your arguments were " + args.mkString(","))  
          System.exit(1)  
          logger.info("主程序意外退出")  
        }  
        //hdfs://hadoop1:8020/user/root/spark/checkpoint  
        val Array(checkpointDirectory) = args  
        logger.info("checkpoint检查:" + checkpointDirectory)  
        val ssc = StreamingContext.getOrCreate(checkpointDirectory,  
          () => {  
            createContext(checkpointDirectory)  
          })  
        logger.info("streaming开始启动")  
        ssc.start()  
        ssc.awaitTermination()  
      }  
      
      def createContext(checkpointDirectory: String): StreamingContext = {  
        //获取配置  
        val brokers = "hadoop3:9092,hadoop4:9092"  
        val topics = "20161218a"  
      
        //默认为5秒  
        val split_rdd_time = 8  
        // 创建上下文  
        val sparkConf = new SparkConf()  
          .setAppName("SendSampleKafkaDataToApple").setMaster("local[2]")  
          .set("spark.app.id", "streaming_kafka")  
      
        val ssc = new StreamingContext(sparkConf, Seconds(split_rdd_time))  
      
        ssc.checkpoint(checkpointDirectory)  
      
        // 创建包含brokers和topic的直接kafka流  
        val topicsSet: Set[String] = topics.split(",").toSet  
        //kafka配置参数  
        val kafkaParams: Map[String, String] = Map[String, String](  
          "metadata.broker.list" -> brokers,  
          "group.id" -> "apple_sample",  
          "serializer.class" -> "kafka.serializer.StringEncoder"  
    //      "auto.offset.reset" -> "largest"   //自动将偏移重置为最新偏移(默认)  
    //      "auto.offset.reset" -> "earliest"  //自动将偏移重置为最早的偏移  
    //      "auto.offset.reset" -> "none"      //如果没有为消费者组找到以前的偏移,则向消费者抛出异常  
        )  
        /** 
          * 从指定位置开始读取kakfa数据 
          * 注意:由于Exactly  Once的机制,所以任何情况下,数据只会被消费一次! 
          *      指定了开始的offset后,将会从上一次Streaming程序停止处,开始读取kafka数据 
          */  
        val offsetList = List((topics, 0, 22753623L),(topics, 1, 327041L))                          //指定topic,partition_no,offset  
        val fromOffsets = setFromOffsets(offsetList)     //构建参数  
        val messageHandler = (mam: MessageAndMetadata[String, String]) => (mam.topic, mam.message()) //构建MessageAndMetadata  
       //使用高级API从指定的offset开始消费,欲了解详情,  
       //请进入"http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$"查看  
        val messages: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)  
      
        //数据操作  
        messages.foreachRDD(mess => {  
          //获取offset集合  
          val offsetsList = mess.asInstanceOf[HasOffsetRanges].offsetRanges  
          mess.foreachPartition(lines => {  
            lines.foreach(line => {  
              val o: OffsetRange = offsetsList(TaskContext.get.partitionId)  
              logger.info("++++++++++++++++++++++++++++++此处记录offset+++++++++++++++++++++++++++++++++++++++")  
              logger.info(s"${o.topic}  ${o.partition}  ${o.fromOffset}  ${o.untilOffset}")  
              logger.info("+++++++++++++++++++++++++++++++此处消费数据操作++++++++++++++++++++++++++++++++++++++")  
              logger.info("The kafka  line is " + line)  
            })  
          })  
        })  
        ssc  
      }  
      
      //构建Map  
      def setFromOffsets(list: List[(String, Int, Long)]): Map[TopicAndPartition, Long] = {  
        var fromOffsets: Map[TopicAndPartition, Long] = Map()  
        for (offset <- list) {  
          val tp = TopicAndPartition(offset._1, offset._2)//topic和分区数  
          fromOffsets += (tp -> offset._3)           // offset位置  
        }  
        fromOffsets  
      }  
    }  
    
    四、参考文档:
        1、spark API  http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$
        2、Kafka官方配置说明:http://kafka.apache.org/documentation.html#configuration
        3、Kafka SampleConsumer:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
        4、Spark streaming 消费遍历offset说明:http://spark.apache.org/docs/1.6.0/streaming-kafka-integration.html
        5、Kafka官方API说明:http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
    注:以上测试通过,可以根据需要修改。如有疑问,请留言!

    重复这个实验的注意事项

    1.首先要知道自己topic ,分区数,checkpoint的文件夹

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic test

    然后在下面的代码设置每个分区的起始位置,

    val offsetList = List((topics, 0, 0L),(topics, 1, 0L),(topics, 2, 0L))
    每次运行之后都上次的消费记录都会记录在checkpint中,比如第一次运行是从0开始消费的,程序暂停之后会从checkpoint中读取上次的位置然后基础消费
    注意的地方是checkpoint要和topic一一对应.不然会报错,还有分分区的个数如果是3个list'里面就有三个,且是从0开始的
    如果换了topic要记得换checkpoint

    现在的代码可以保证每次启动之后从上次的问题开始消费.
    从指定位置消费的做法是,切换一个新的checkpoint文件夹,在
    val offsetList = List((topics, 0, 120L),(topics, 1, 0L),(topics, 2, 0L))中执行指定哪个分区从哪里开始消费,此时是指0号分区从120个偏移量开始消费

    此时offset自己管理是没有存储在zk中的是查询不到的
    val offsetList = List((topics, 0, 0L),(topics, 1, 0L),(topics, 2, 0L))
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
    () => {
    createContext(checkpointDirectory)
    })

    http://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

    不进入断点的原因
    Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
    If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
    recreated from the checkpoint data. If the data does not exist, then the StreamingContext
    will be created by called the provided `creatingFunc`.

    Storing Offsets Outside Kafka

    The consumer application need not use Kafka's built-in offset storage, it can store offsets in a store of its own choosing. The primary use case for this is allowing the application to store both the offset and the results of the consumption in the same system in a way that both the results and offsets are stored atomically. This is not always possible, but when it is it will make the consumption fully atomic and give "exactly once" semantics that are stronger than the default "at-least once" semantics you get with Kafka's offset commit functionality.

     lz实现的代码

    package my.bigdata.studyKafka
    
    /**
      * Created by lq on 2017/8/30.
      */
    import kafka.common.TopicAndPartition
    import kafka.message.MessageAndMetadata
    import kafka.serializer.StringDecoder
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, TaskContext}
    import org.slf4j.LoggerFactory
    
    
    object ReadBySureOffsetTest {
      val logger = LoggerFactory.getLogger(ReadBySureOffsetTest.getClass)
    
      def main(args: Array[String]) {
        //设置打印日志级别
        System.setProperty("HADOOP_USER_NAME", "root")
        Logger.getLogger("org.apache.kafka").setLevel(Level.ERROR)
        Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR)
        Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
        logger.info("测试从指定offset消费kafka的主程序开始")
        if (args.length < 1) {
          System.err.println("Your arguments were " + args.mkString(","))
          System.exit(1)
          logger.info("主程序意外退出")
        }
        //hdfs://hadoop1:8020/user/root/spark/checkpoint
        val Array(checkpointDirectory) = args
        logger.info("checkpoint检查:" + checkpointDirectory)
        val ssc = StreamingContext.getOrCreate(checkpointDirectory,
          () => {
            createContext(checkpointDirectory)
          })
        logger.info("streaming开始启动")
        ssc.start()
        ssc.awaitTermination()
      }
    
      def createContext(checkpointDirectory: String): StreamingContext = {
        //获取配置
        val brokers = "slave1:9092,slave2:9092"
        val topics = "maats1"
    
        //默认为5秒
        val split_rdd_time = 8
        // 创建上下文
        val sparkConf = new SparkConf()
          .setAppName("SendSampleKafkaDataToApple").setMaster("local[2]")
          .set("spark.app.id", "streaming_kafka")
    
        val ssc = new StreamingContext(sparkConf, Seconds(split_rdd_time))
    
        ssc.checkpoint(checkpointDirectory)
    
        // 创建包含brokers和topic的直接kafka流
        val topicsSet: Set[String] = topics.split(",").toSet
        //kafka配置参数
        val kafkaParams: Map[String, String] = Map[String, String](
          "metadata.broker.list" -> brokers,
          "group.id" -> "first3",
          "serializer.class" -> "kafka.serializer.StringEncoder"
          //      "auto.offset.reset" -> "largest"   //自动将偏移重置为最新偏移(默认)
          //      "auto.offset.reset" -> "earliest"  //自动将偏移重置为最早的偏移
          //      "auto.offset.reset" -> "none"      //如果没有为消费者组找到以前的偏移,则向消费者抛出异常
        )
        /**
          * 从指定位置开始读取kakfa数据
          * 注意:由于Exactly  Once的机制,所以任何情况下,数据只会被消费一次!
          *      指定了开始的offset后,将会从上一次Streaming程序停止处,开始读取kafka数据
          */
        //val offsetList = List((topics, 0, 22753623L),(topics, 1, 327041L))                          //指定topic,partition_no,offset
        val offsetList = List((topics, 0, 230L),(topics, 1, 0L),(topics, 2, 0L))
        val fromOffsets = setFromOffsets(offsetList)     //构建参数
        val messageHandler = (mam: MessageAndMetadata[String, String]) => (mam.topic, mam.message()) //构建MessageAndMetadata
        //使用高级API从指定的offset开始消费,欲了解详情,
        //请进入"http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$"查看
        val messages: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
    
        //数据操作
        messages.foreachRDD(mess => {
          //获取offset集合
          val offsetsList = mess.asInstanceOf[HasOffsetRanges].offsetRanges
          mess.foreachPartition(lines => {
            lines.foreach(line => {
              val o: OffsetRange = offsetsList(TaskContext.get.partitionId)
              logger.info("++++++++++++++++++++++++++++++此处记录offset+++++++++++++++++++++++++++++++++++++++")
              logger.info(s"${o.topic}  ${o.partition}  ${o.fromOffset}  ${o.untilOffset}")
              logger.info("+++++++++++++++++++++++++++++++此处消费数据操作++++++++++++++++++++++++++++++++++++++")
              logger.info("The kafka  line is " + line)
            })
          })
        })
        ssc
      }
    
      //构建Map
      def setFromOffsets(list: List[(String, Int, Long)]): Map[TopicAndPartition, Long] = {
        var fromOffsets: Map[TopicAndPartition, Long] = Map()
        for (offset <- list) {
          val tp = TopicAndPartition(offset._1, offset._2)//topic和分区数
          fromOffsets += (tp -> offset._3)           // offset位置
        }
        fromOffsets
      }
    }


  • 相关阅读:
    快速架设OpenStack云基础平台
    源码编译安装Nginx全程视频演示
    参加2012 Openstack亚太技术大会
    FFmpeg的安装与使用
    Linux下图解minicom安装
    [转]ARM/Thumb2PortingHowto
    [原]逆向iOS SDK -- _UIImageAtPath 的实现(SDK 6.1)
    [原]逆向iOS SDK -- +[UIImage imageNamed:] 的实现
    在 Emacs 中如何退出 Slime Mode
    [转] iOS ABI Function Call Guide
  • 原文地址:https://www.cnblogs.com/rocky-AGE-24/p/7454252.html
Copyright © 2011-2022 走看看