zoukankan      html  css  js  c++  java
  • kafka 参数设置

        public static void main(String[] args) {
            Properties properties = new Properties();
            //kafka集群,下面的配置都可以Prodcuer        
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.102:9092");
            //acks方式        
            properties.put("acks", "all");
            //重试次数        
            properties.put("reties", 1);
            //批次大小,每次发送数据的大小       
            properties.put("batch.size", 16384);
            //等待时间,如果数据迟迟没有达到batch.size的大小,等待的linger.ms之后就发送数据       
            properties.put("linger.ms", 1);
            //RecordAccumulator缓冲区大小        
            properties.put("buffer.memory", 33554432);
            //序列化        
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            Producer producer = new KafkaProducer(properties);
            for (int i = 0; i < 10; i++) {
                //"first"是topic,后面是value           
                producer.send(new ProducerRecord("first", Integer.toString(i)));
            }
            //关闭资源,一定要关闭        
            producer.close();
        }
    public class ConfigKafkaProducer {
        public static void main(String[] args) {
            //TODO 生产者三个属性必须指定(broker地址清单、key和value的序列化器)
            Properties properties = new Properties();
            properties.put("bootstrap.servers","127.0.0.1:9092");
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            //TODO 更多发送配置(重要的)
            properties.put("acks","1"); //ack 0,1,all
            // 一个批次可以使用的内存大小 缺省16384(16k)
            properties.put("batch.size",16384);
            // 指定了生产者在发送批次前等待更多消息加入批次的时间,  缺省0  50ms
            properties.put("linger.ms",0L);
            // 控制生产者发送请求最大大小,默认1M (这个参数和Kafka主机的message.max.bytes 参数有关系)
            properties.put("max.request.size",1 * 1024 * 1024);
            //TODO 更多发送配置(非重要的)
            properties.put("buffer.memory",32 * 1024 * 1024L);//生产者内存缓冲区大小
            properties.put("retries",0); //重发消息次数
            //客户端将等待请求的响应的最大时间 默认30秒
            properties.put("request.timeout.ms",30 * 1000);
            //最大阻塞时间,超过则抛出异常 缺省60000ms
            properties.put("max.block.ms",60*1000);
            // 于压缩数据的压缩类型。默认是无压缩 ,none、gzip、snappy
            properties.put("compression.type","none");
            KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
            try {
                ProducerRecord<String,String> record;
                try {
                    //TODO 发送4条消息
                    for(int i=0;i<4;i++){
                        record = new ProducerRecord<String,String>(
                                BusiConst.HELLO_TOPIC, String.valueOf(i),"hankin");
                        producer.send(record);
                        System.out.println(i+",message is sent");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            } finally {
                producer.close();
            }
        }
    }

    https://blog.csdn.net/m0_37661458/article/details/102640246

  • 相关阅读:
    973. K Closest Points to Origin
    919. Complete Binary Tree Inserter
    993. Cousins in Binary Tree
    20. Valid Parentheses
    141. Linked List Cycle
    912. Sort an Array
    各种排序方法总结
    509. Fibonacci Number
    374. Guess Number Higher or Lower
    238. Product of Array Except Self java solutions
  • 原文地址:https://www.cnblogs.com/kpwong/p/15647303.html
Copyright © 2011-2022 走看看