zoukankan      html  css  js  c++  java
  • java SASL_SSL 帐号密码 方式访问 kafka

    java SASL_SSL 帐号密码 方式访问 kafka

    Producer Java Sample java生产者:

    Properties props = new Properties();
    props.put("bootstrap.servers", "*******:9092,*******:9092");
    props.put("acks", "all");//
    props.put("retries", 3);
    props.put("batch.size", 106384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("security.protocol", "SASL_SSL");
    props.put("ssl.truststore.location", "D:/client_truststore.jks");
    props.put("ssl.truststore.password", "WSO2_sp440");
    props.put("sasl.mechanism", "SCRAM-SHA-512");
    props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='kaf_crm' password='xxxxxxx';"); //注意passwod结尾的分号一定不要漏

    props.put("ssl.endpoint.identification.algorithm", "");

    long sys = System.currentTimeMillis();

    String contractId=CRM_ContractID
    String payload = "payload";
    Producer<String, String> producer = new KafkaProducer<>(props);

    //Synchronized Mode, Producer will wait and block until Kafka Server return response

    try{

    Future future =producer.send(new ProducerRecord<>("CRM_Contract", contractId, payload));// (topic, key, payload),the second parameter is the key
    future.get();//。 If not care whether success or failure , no need this code

    producer.close();

    } catch(Exception e) {
    e.printStackTrace();// Connection, No Leader error can be resolved by retry; but too large message error will not re-try and throw exception immediately
    }

    //Asynchronized mode, Producer not wait for response, Background process of Producer submit message to Kafka server by Batch size. It need callback to handle whether message is sent to Kafka Server. If error happen ,need to log the exception.

    try{

    producer.send(new ProducerRecord<>("CRM_Contract", contractId, payload),new Callback() {

    public void onCompletion(RecordMetadata metadata, Exception e) {
    if(e != null) {
    e.printStackTrace();
    } else {
    System.out.println("The offset of the record we just sent is: " + metadata.offset());}}});

    }catch(Exception e) {

    e.printStackTrace();

    }

    Consumer Java Sample java消费者:

    Properties props = new Properties();

    props.put("bootstrap.servers", "*******:9092");

    props.put("group.id", "wso2_sp");
    props.put("enable.auto.commit", "false");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    props.put("security.protocol", "SASL_SSL");
    props.put("sasl.mechanism", "SCRAM-SHA-512");
    props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "G:\client_truststore.jks");
    props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "WSO2_sp440");
    props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='kaf_xxx' password='xxxxx';");//注意passwod结尾的分号一定不要漏

    props.put("ssl.endpoint.identification.algorithm", "");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    String topic = "file_poc";
    consumer.subscribe(Arrays.asList(topic));

    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
    System.out.printf("partition= %d, offset = %d, key = %s, value = %s ", record.partition(), record.offset(), record.key(), record.value());
    }
    consumer.commitSync();
    }

  • 相关阅读:
    csharp: Cyotek.GhostScript.PdfConversion pdf convert image
    csharp: using Acrobat.dll pdf convert images in winform
    机器学习实战---K均值聚类算法
    机器学习实战---决策树CART回归树实现
    机器学习实战---决策树CART简介及分类树实现
    机器学习实战---线性回归(更好的使用正规方程求解)
    机器学习实战---逻辑回归梯度上升(更好的理解sigmoid函数的含义并改进)
    机器学习实战---朴素贝叶斯算法使用K折交叉验证
    机器学习实战---朴素贝叶斯算法
    机器学习实战---决策树ID3算法
  • 原文地址:https://www.cnblogs.com/suizhikuo/p/13406288.html
Copyright © 2011-2022 走看看