zoukankan      html  css  js  c++  java
  • Kafka通讯的Java实例

    • 依赖:

    kafka_2.12-2.0.0.jar、kafka-clients-2.0.0.jar、log4j-1.2.17.jar、slf4j-api-1.7.25.jar、slf4j-log4j12-1.7.25.jar

    • lkafkaConstants.java
    • package kafka_proj;
      
      public interface IkafkaConstants {public static String KAFKA_BROKERS = "192.168.65.130:9092, 192.168.65.131:9092, 192.168.65.132:9092";
          public static Integer MESSAGE_COUNT = 100;
          public static String CLIENT_ID = "0";
          public static String TOPIC_NAME = "test9";
          public static String GROUP_ID_CONFIG = "Group1";
          public static Integer MAX_NO_MESSAGE_FOUND_COUNT = 100;
          public static String OFFSET_RESET_LATEST = "latest";
          public static String OFFSET_RESET_EARLIER = "earliest";
          public static Integer MAX_POLL_RECORDS = 1;
          
      
      }
    • ConsumerCreator.java
    • package kafka_proj;
      
      import java.util.Collections;
      import java.util.Properties;
      import org.apache.kafka.clients.consumer.Consumer;
      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.common.serialization.IntegerDeserializer;
      import org.apache.kafka.common.serialization.StringDeserializer;
      import kafka_proj.IkafkaConstants;
      public class ConsumerCreator {
          public static Consumer<String, String> createConsumer() {
              Properties props = new Properties();
              props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, IkafkaConstants.KAFKA_BROKERS);
              props.put(ConsumerConfig.GROUP_ID_CONFIG, IkafkaConstants.GROUP_ID_CONFIG);
              props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
              props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
              props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, IkafkaConstants.MAX_POLL_RECORDS);
              props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
              props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, IkafkaConstants.OFFSET_RESET_EARLIER);
              Consumer<String, String> consumer = new KafkaConsumer<>(props);
              consumer.subscribe(Collections.singletonList(IkafkaConstants.TOPIC_NAME));
              return consumer;
          }
      }
    • CustomPartitioner.java
    • package kafka_proj;
      
      import java.util.Map;
      import org.apache.kafka.clients.producer.Partitioner;
      import org.apache.kafka.common.Cluster;
      public class CustomPartitioner implements Partitioner{
        private static final int PARTITION_COUNT=3;
        @Override
        public void configure(Map<String, ?> configs) {
        }
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            
            int partitionNum = 0;
            try {
                  partitionNum = Integer.parseInt((String) key);
                  System.out.println(partitionNum);
              } catch (Exception e) {
                  partitionNum = key.hashCode();
              }
            return Math.abs(partitionNum % PARTITION_COUNT);
            
        }
        @Override
        public void close() {
        }
      }
    • ProducerCreator.java
    • package kafka_proj;
      
      import java.util.Properties;
      import org.apache.kafka.clients.producer.KafkaProducer;
      import org.apache.kafka.clients.producer.Producer;
      import org.apache.kafka.clients.producer.ProducerConfig;
      import org.apache.kafka.common.serialization.IntegerSerializer;
      import org.apache.kafka.common.serialization.StringSerializer;
      import kafka_proj.IkafkaConstants;
      
      
      
      public class ProducerCreator {
          public static Producer<String, String> createProducer(){
              Properties props = new Properties();
              props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IkafkaConstants.KAFKA_BROKERS);
              props.put(ProducerConfig.CLIENT_ID_CONFIG, IkafkaConstants.CLIENT_ID);
              props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
              props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
              props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
              return new KafkaProducer<>(props);
              
          }
      
      }
    • App.java
    • package kafka_proj;
      import java.util.concurrent.ExecutionException;
      import org.apache.kafka.clients.consumer.Consumer;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.producer.Producer;
      import org.apache.kafka.clients.producer.ProducerRecord;
      import org.apache.kafka.clients.producer.RecordMetadata;
      import kafka_proj.IkafkaConstants;
      import kafka_proj.ConsumerCreator;
      import kafka_proj.ProducerCreator;
      public class App {
          public static void main(String[] args) {
              //runProducer();
              runConsumer();
          }
          static void runConsumer() {
              Consumer<String, String> consumer = ConsumerCreator.createConsumer();
              int noMessageFound = 0;
              while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
                // 1000 is the time in milliseconds consumer will wait if no record is found at broker.
                if (consumerRecords.count() == 0) {
                    noMessageFound++;
                    if (noMessageFound > IkafkaConstants.MAX_NO_MESSAGE_FOUND_COUNT)
                      // If no message found count is reached to threshold exit loop.  
                      break;
                    else
                        continue;
                }
                // print each record. 
                consumerRecords.forEach(record -> {
                    System.out.println("Record Key " + record.key());
                    System.out.println("Record value " + record.value());
                    System.out.println("Record partition " + record.partition());
                    System.out.println("Record offset " + record.offset());
                 });
                // commits the offset of record to broker. 
                 consumer.commitAsync();
              }
          consumer.close();
          }
          static void runProducer() {
      Producer<String, String> producer = ProducerCreator.createProducer();
              for (int index = 0; index < IkafkaConstants.MESSAGE_COUNT; index++) {
                  // System.out.println("***1");
                  ProducerRecord<String, String> record = new ProducerRecord<String, String>(IkafkaConstants.TOPIC_NAME,index%3, String.valueOf(index),
                  "This is record " + index);
                  // System.out.println("***2");
                  try {
                  RecordMetadata metadata = producer.send(record).get();
                              System.out.println("New Record sent with key " + index + " to partition " + metadata.partition()
                              + " with offset " + metadata.offset());
                       } 
                  catch (ExecutionException e) {
                           System.out.println("Error in sending record");
                           System.out.println(e);
                        } 
                   catch (InterruptedException e) {
                            System.out.println("Error in sending record");
                            System.out.println(e);
                        }
                  // System.out.println("***3");
               }
          }
      }
  • 相关阅读:
    19-10-31-B
    19-10-30-Night-V
    19-10-30-C
    19-10-29-Night-X
    19-10-29-Z
    19-10-28-A
    19-10-27-S
    19-10-26-Night-D
    留言板
    优秀博客存档
  • 原文地址:https://www.cnblogs.com/ryu-manager/p/9436708.html
Copyright © 2011-2022 走看看