zoukankan      html  css  js  c++  java
  • KafkaUtils.createDirectStream()参数详解

    通过KafkaUtils.createDirectStream该方法创建kafka的DStream数据源,传入有三个参数:ssc,LocationStrategies,ConsumerStrategies。

    LocationStrategies有三种策略:PreferBrokers,PreferConsistent,PreferFixed详情查看上边源码解析

    /**
     * :: Experimental :: object to obtain instances of [[LocationStrategy]]
     *
     */
    @Experimental
    object LocationStrategies {
      /**
       *  :: Experimental ::
       * Use this only if your executors are on the same nodes as your Kafka brokers. 只有当executors数量等于brokers数量时使用
       */
      @Experimental
      def PreferBrokers: LocationStrategy =
        org.apache.spark.streaming.kafka010.PreferBrokers
    
      /**
       *  :: Experimental ::
       * Use this in most cases, it will consistently distribute partitions across all executors.大多数使用,在所有的executors分配分区
       */
      @Experimental
      def PreferConsistent: LocationStrategy =
        org.apache.spark.streaming.kafka010.PreferConsistent
    
      /**
       *  :: Experimental ::
       * Use this to place particular TopicPartitions on particular hosts if your load is uneven.
       * Any TopicPartition not specified in the map will use a consistent location.如果负载不平衡,把特定的TopicPartitions放在特定的hosts,不在这个map中的TopicPartition采用PreferConsistent策略
       */
      @Experimental
      def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy =
        new PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava))
    
      /**
       *  :: Experimental ::
       * Use this to place particular TopicPartitions on particular hosts if your load is uneven.
       * Any TopicPartition not specified in the map will use a consistent location.
       */
      @Experimental
      def PreferFixed(hostMap: ju.Map[TopicPartition, String]): LocationStrategy =
        new PreferFixed(hostMap)
    

    ConsumerStrategies消费者策略:Subscribe,SubscribePattern,Assign,订阅和分配

      Subscribe为consumer自动分配partition,有内部算法保证topic-partitions以最优的方式均匀分配给同group下的不同consumer

      Assign为consumer手动、显示的指定需要消费的topic-partitions,不受group.id限制,相当于指定的group无效

    /**
       *  :: Experimental ::
       * Subscribe to a collection of topics.
       * @param topics collection of topics to subscribe
       * @param kafkaParams Kafka
       * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
       * configuration parameters</a> to be used on driver. The same params will be used on executors,
       * with minor automatic modifications applied.
       *  Requires "bootstrap.servers" to be set
       * with Kafka broker(s) specified in host1:port1,host2:port2 form.
       * @param offsets: offsets to begin at on initial startup.  If no offset is given for a
       * TopicPartition, the committed offset (if applicable) or kafka param
       * auto.offset.reset will be used.
       */
      @Experimental
      def Subscribe[K, V](
          topics: Iterable[jl.String],
          kafkaParams: collection.Map[String, Object],
          offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {
        new Subscribe[K, V](
          new ju.ArrayList(topics.asJavaCollection),
          new ju.HashMap[String, Object](kafkaParams.asJava),
          new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
      }
    
    /** :: Experimental ::
       * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
       * The pattern matching will be done periodically against topics existing at the time of check.
       * @param pattern pattern to subscribe to
       * @param kafkaParams Kafka
       * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
       * configuration parameters</a> to be used on driver. The same params will be used on executors,
       * with minor automatic modifications applied.
       *  Requires "bootstrap.servers" to be set
       * with Kafka broker(s) specified in host1:port1,host2:port2 form.
       * @param offsets: offsets to begin at on initial startup.  If no offset is given for a
       * TopicPartition, the committed offset (if applicable) or kafka param
       * auto.offset.reset will be used.
       */
      @Experimental
      def SubscribePattern[K, V](
          pattern: ju.regex.Pattern,
          kafkaParams: collection.Map[String, Object],
          offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {
        new SubscribePattern[K, V](
          pattern,
          new ju.HashMap[String, Object](kafkaParams.asJava),
          new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
      }
    
    /**
       *  :: Experimental ::
       * Assign a fixed collection of TopicPartitions
       * @param topicPartitions collection of TopicPartitions to assign
       * @param kafkaParams Kafka
       * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
       * configuration parameters</a> to be used on driver. The same params will be used on executors,
       * with minor automatic modifications applied.
       *  Requires "bootstrap.servers" to be set
       * with Kafka broker(s) specified in host1:port1,host2:port2 form.
       * @param offsets: offsets to begin at on initial startup.  If no offset is given for a
       * TopicPartition, the committed offset (if applicable) or kafka param
       * auto.offset.reset will be used.
       */
      @Experimental
      def Assign[K, V](
          topicPartitions: Iterable[TopicPartition],
          kafkaParams: collection.Map[String, Object],
          offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {
        new Assign[K, V](
          new ju.ArrayList(topicPartitions.asJavaCollection),
          new ju.HashMap[String, Object](kafkaParams.asJava),
          new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
      }
    

     Cannot resolve overloaded method:

      原因:方法中传入的参数不符合要求。检查参数类型

  • 相关阅读:
    Docker-Compose搭建单体SkyWalking 6.2
    Docker搭建MySQL主从集群,基于GTID
    【简记】修改Docker数据目录位置,包含镜像位置
    【拆分版】Docker-compose构建Kibana单实例,基于7.1.0
    【拆分版】Docker-compose构建Logstash多实例,基于7.1.0
    【拆分版】Docker-compose构建Zookeeper集群管理Kafka集群
    命令行模式和python交互模式
    详解 HTTPS、TLS、SSL、HTTP区别和关系
    windows下如何查看进程、端口占用、杀死进程教程
    pycharm最常用的快捷键总结
  • 原文地址:https://www.cnblogs.com/zbw1112/p/12788679.html
Copyright © 2011-2022 走看看