zoukankan      html  css  js  c++  java
  • SimpleKafkaProducer with ssl

    import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
    import static org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG;
    import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
    import static org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG;
    import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
    import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
    import static org.apache.kafka.clients.producer.ProducerConfig.MAX_REQUEST_SIZE_CONFIG;
    import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG;
    import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
    
    import java.io.IOException;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    public class SimpleKafkaProducer {
    
      private static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
      private static final String kafkaAckTopic = "test-input";
    
      private Properties kafkaProperties = new Properties();
      private Producer<String, Object> producerSupplier;
    
      {
        kafkaProperties.put(BOOTSTRAP_SERVERS_CONFIG, KafkaProducerConfiguration.bootstrapServer);
        kafkaProperties.put(RETRIES_CONFIG, KafkaProducerConfiguration.retries);
        kafkaProperties.put(BATCH_SIZE_CONFIG, KafkaProducerConfiguration.batchSize);
        kafkaProperties.put(LINGER_MS_CONFIG, KafkaProducerConfiguration.lingerMs);
        kafkaProperties.put(BUFFER_MEMORY_CONFIG, KafkaProducerConfiguration.bufferMemory);
        kafkaProperties.put(KEY_SERIALIZER_CLASS_CONFIG, KafkaProducerConfiguration.keySerializer);
        kafkaProperties.put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaProducerConfiguration.valueSerializer);
        kafkaProperties.put(SCHEMA_REGISTRY_URL, KafkaProducerConfiguration.schemaRegistryUrl);
        kafkaProperties.put(MAX_REQUEST_SIZE_CONFIG, KafkaProducerConfiguration.producerMaxRequestSize);
        if (KafkaProducerConfiguration.useSsl) {
          // -Djavax.net.ssl.trustStore=C:/cacerts/DEV_cacerts.jks
          System.setProperty("javax.net.ssl.trustStore", "C:/cacerts/DEV_cacerts.jks");
          kafkaProperties.put(SECURITY_PROTOCOL_CONFIG, "SSL");
        }
        producerSupplier = new KafkaProducer<>(kafkaProperties);
      }
    
      public RecordMetadata send() throws IOException, ExecutionException, InterruptedException {
        RecordMetadata recordMetadata = producerSupplier.send(createProducerRecord()).get();
        System.out.println("to " + recordMetadata + " at " + recordMetadata.timestamp());
    
        return recordMetadata;
      }
    
      private ProducerRecord<String, Object> createProducerRecord() throws IOException {
        String key = "1584406824432";
        String value = "test";
        System.out.println("send key: " + key);
        System.out.println("send value: " + value);
        return new ProducerRecord<>(kafkaAckTopic, key, value);
      }
    
      private static class KafkaProducerConfiguration {
    
        static String bootstrapServer = "localhost:9092";
    
        static String schemaRegistryUrl = "";
    
        static int retries = 0;
    
        static int batchSize = 16384;
    
        static int lingerMs = 1;
    
        static int bufferMemory = 33554432;
    
        static String keySerializer = "org.apache.kafka.common.serialization.StringSerializer";
    
        static String valueSerializer = "org.apache.kafka.common.serialization.StringSerializer";
    
        static int producerMaxRequestSize = 20971520;
    
        static boolean useSsl = true;
      }
    
      public static void main(String[] args)
          throws IOException, ExecutionException, InterruptedException {
        SimpleKafkaProducer producer = new SimpleKafkaProducer();
        producer.send();
      }
    }
  • 相关阅读:
    【JavaWeb】SpringBoot配置静态资源路径
    apache和nginx设置反向代理
    【other】idea格式化快捷键ctr+alt+L与qq锁定冲突
    【深入Java基础】各个Map类的比较与总结
    【EmguCv】瞳孔定位(二)
    【深入Java基础】LinkedHashMap的特点与原理
    彪神666(暴力即可)
    被打脸的潇洒哥(推递推式)
    送气球.jpg(模拟)
    赌神(逆向思维)
  • 原文地址:https://www.cnblogs.com/tonggc1668/p/12511491.html
Copyright © 2011-2022 走看看