zoukankan      html  css  js  c++  java
  • kafka 参数设置

        public static void main(String[] args) {
            Properties properties = new Properties();
            //kafka集群,下面的配置都可以Prodcuer        
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.102:9092");
            //acks方式        
            properties.put("acks", "all");
            //重试次数        
            properties.put("reties", 1);
            //批次大小,每次发送数据的大小       
            properties.put("batch.size", 16384);
            //等待时间,如果数据迟迟没有达到batch.size的大小,等待的linger.ms之后就发送数据       
            properties.put("linger.ms", 1);
            //RecordAccumulator缓冲区大小        
            properties.put("buffer.memory", 33554432);
            //序列化        
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            Producer producer = new KafkaProducer(properties);
            for (int i = 0; i < 10; i++) {
                //"first"是topic,后面是value           
                producer.send(new ProducerRecord("first", Integer.toString(i)));
            }
            //关闭资源,一定要关闭        
            producer.close();
        }
    public class ConfigKafkaProducer {
        public static void main(String[] args) {
            //TODO 生产者三个属性必须指定(broker地址清单、key和value的序列化器)
            Properties properties = new Properties();
            properties.put("bootstrap.servers","127.0.0.1:9092");
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            //TODO 更多发送配置(重要的)
            properties.put("acks","1"); //ack 0,1,all
            // 一个批次可以使用的内存大小 缺省16384(16k)
            properties.put("batch.size",16384);
            // 指定了生产者在发送批次前等待更多消息加入批次的时间,  缺省0  50ms
            properties.put("linger.ms",0L);
            // 控制生产者发送请求最大大小,默认1M (这个参数和Kafka主机的message.max.bytes 参数有关系)
            properties.put("max.request.size",1 * 1024 * 1024);
            //TODO 更多发送配置(非重要的)
            properties.put("buffer.memory",32 * 1024 * 1024L);//生产者内存缓冲区大小
            properties.put("retries",0); //重发消息次数
            //客户端将等待请求的响应的最大时间 默认30秒
            properties.put("request.timeout.ms",30 * 1000);
            //最大阻塞时间,超过则抛出异常 缺省60000ms
            properties.put("max.block.ms",60*1000);
            // 于压缩数据的压缩类型。默认是无压缩 ,none、gzip、snappy
            properties.put("compression.type","none");
            KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
            try {
                ProducerRecord<String,String> record;
                try {
                    //TODO 发送4条消息
                    for(int i=0;i<4;i++){
                        record = new ProducerRecord<String,String>(
                                BusiConst.HELLO_TOPIC, String.valueOf(i),"hankin");
                        producer.send(record);
                        System.out.println(i+",message is sent");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            } finally {
                producer.close();
            }
        }
    }

    https://blog.csdn.net/m0_37661458/article/details/102640246

  • 相关阅读:
    UOJ#80. 二分图最大权匹配 模板
    BZOJ2243: [SDOI2011]染色
    LA5713 Qin Shi Huang's National Road System
    BZOJ1977: [BeiJing2010组队]次小生成树 Tree
    LA5009 Error Curves
    BZOJ1013: [JSOI2008]球形空间产生器sphere
    BZOJ2733: [HNOI2012]永无乡
    BZOJ1552: [Cerc2007]robotic sort
    BZOJ3223: Tyvj 1729 文艺平衡树
    网络流24题(24/24)
  • 原文地址:https://www.cnblogs.com/kpwong/p/15647303.html
Copyright © 2011-2022 走看看