zoukankan      html  css  js  c++  java
  • KafkaConfig介绍

    public class KafkaConfig implements Serializable {
    
    	/** 一个借口,实现类有ZkHosts,和StatisHosts	 **/
        public final BrokerHosts hosts; 
        public final String topic; // kafka topic name
        public final String clientId; // 自己取一个唯一的ID吧
    
        public int fetchSizeBytes = 1024 * 1024; // 每次从kafka读取的byte数,这个变量会在KafkaUtils的fetchMessage方法中看到
        public int socketTimeoutMs = 10000; //  Consumer连接kafka server超时时间
        public int fetchMaxWait = 10000; 
        public int bufferSizeBytes = 1024 * 1024;   //Consumer端缓存大小
        public MultiScheme scheme = new RawMultiScheme(); // 数据发送的序列化和反序列化定义的Scheme,后续会专门有一篇介绍
        public boolean forceFromStart = false;	// 和startOffsetTime,一起用,默认情况下,为false,一旦startOffsetTime被设置,就要置为true
        public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); // -2 从kafka头开始  -1 是从最新的开始 0 =无 从ZK开始
        public long maxOffsetBehind = Long.MAX_VALUE;  // 每次kafka会读取一批offset存放在list中,当zk offset比当前本地保存的commitOffse相减大于这个值时,重新设置commitOffset为当前zk offset,代码见PartitionManager
        public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
        public int metricsTimeBucketSizeInSecs = 60;
    
        public KafkaConfig(BrokerHosts hosts, String topic) {
            this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());
        }
    
        public KafkaConfig(BrokerHosts hosts, String topic, String clientId) {
            this.hosts = hosts;
            this.topic = topic;
            this.clientId = clientId;
        }
    
    }
    

      

    public class SpoutConfig extends KafkaConfig implements Serializable {
        public List<String> zkServers = null; // zk hosts 列表,格式就是简单ip:xxx.xxx.xxx.xxx,作为zkserver ,后续leader election用
        public Integer zkPort = null;	// zk端口,一般是2181
        public String zkRoot = null;  // 该参数是Consumer消费的meta信息,保存在zk的路径,自己指定
        public String id = null;		// 唯一id 
        public long stateUpdateIntervalMs = 2000; // commit 消费的offset到zk的时间间隔
    
        public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
            super(hosts, topic);
            this.zkRoot = zkRoot;
            this.id = id;
        }
    }
    

      

  • 相关阅读:
    Spring Boot 创建一个可以执行的 Jar
    Spring Boot 第一个示例启动运行
    Spring Boot 第一个示例 “main” 方法
    Spring Boot 第一个示例 @EnableAutoConfiguration 注解
    Spring Boot 第一个示例的 @RestController 和 @RequestMapping 注解
    Spring Boot 2.4 第一个示例程序书写代码
    Spring Boot 2.4 第一个示例程序添加 Classpath 依赖
    Spring Boot 2.4 示例创建 POM 文件
    Spring Boot 2.4 部署你的第一个 Spring Boot 应用需要的环境
    Mysql 的concat、concat_ws()以及group_concat()的用法与区别
  • 原文地址:https://www.cnblogs.com/metoy/p/4454347.html
Copyright © 2011-2022 走看看