zoukankan      html  css  js  c++  java
  • kafka的Java客户端示例代码(kafka_2.12-0.10.2.1)

    使用0.9开始增加的KafkaProducer和KafkaConsumer。

    Pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>cn.ljh.kafka</groupId>
      <artifactId>kafka-helloworld</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <packaging>jar</packaging>
    
      <name>kafka-helloworld</name>
      <url>http://maven.apache.org</url>
    
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.12</version>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka_2.12</artifactId>
          <version>0.10.2.1</version>
          <scope>compile</scope>
          <exclusions>
            <exclusion>
              <artifactId>jmxri</artifactId>
              <groupId>com.sun.jmx</groupId>
            </exclusion>
            <exclusion>
              <artifactId>jms</artifactId>
              <groupId>javax.jms</groupId>
            </exclusion>
            <exclusion>
              <artifactId>jmxtools</artifactId>
              <groupId>com.sun.jdmk</groupId>
            </exclusion>
          </exclusions>
        </dependency>
      </dependencies>
    </project>

    HelloWorldProducer2.java

    package cn.ljh.kafka.kafka_helloworld;
    
    import java.util.Date;
    import java.util.Properties;
    import java.util.Random;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    public class HelloWorldProducer2 {
        public static void main(String[] args) {
             long events = Long.parseLong(args[0]);
             Random rnd = new Random();
        
             Properties props = new Properties();
             props.put("bootstrap.servers", "192.168.137.176:9092,192.168.137.176:9093,192.168.137.176:9094");
             props.put("acks", "all");
             props.put("retries", 0);
             props.put("batch.size", 16384);
             props.put("linger.ms", 1);
             props.put("buffer.memory", 33554432);
             props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
             props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
             //配置partitionner选择策略,可选配置
             props.put("partitioner.class", "cn.ljh.kafka.kafka_helloworld.SimplePartitioner2");
        
             Producer<String, String> producer = new KafkaProducer<>(props);
        
             for (long nEvents = 0; nEvents < events; nEvents++) { 
                    long runtime = new Date().getTime();  
                    String ip = "192.168.2." + rnd.nextInt(255); 
                    String msg = runtime + ",www.example.com," + ip; 
                    ProducerRecord<String, String> data = new ProducerRecord<String, String>("page_visits", ip, msg);
                    producer.send(data,
                             new Callback() {
                         public void onCompletion(RecordMetadata metadata, Exception e) {
                             if(e != null) {
                                e.printStackTrace();
                             } else {
                                System.out.println("The offset of the record we just sent is: " + metadata.offset());
                             }
                         }
                     });
             }
             producer.close();
        }
    }

    SimplePartitioner2.java

    package cn.ljh.kafka.kafka_helloworld;
    
    import java.util.List;
    import java.util.Map;
    
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    import org.apache.kafka.common.PartitionInfo;
    
     
    public class SimplePartitioner2 implements Partitioner {
     
        @Override
        public void configure(Map<String, ?> configs) {
            // TODO Auto-generated method stub
            
        }
    
        @Override
        public int partition(String topic, Object key, byte[] keyBytes,
                Object value, byte[] valueBytes, Cluster cluster) {
                int partition = 0;
                 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
                int numPartitions = partitions.size();
                String stringKey = (String) key;
                int offset = stringKey.lastIndexOf('.');
                if (offset > 0) {
                   partition = Integer.parseInt( stringKey.substring(offset+1)) % numPartitions;
                }
                
                return partition;
        }
    
        @Override
        public void close() {
            // TODO Auto-generated method stub
            
        }
     
    }

    HelloWorldConsumer2.java

    package cn.ljh.kafka.kafka_helloworld;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    public class HelloWorldConsumer2 {
            
        public static void main(String[] args) {
            Properties props = new Properties();
    
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.176:9092,192.168.137.176:9093,192.168.137.176:9094");
            props.put(ConsumerConfig.GROUP_ID_CONFIG ,"test") ;
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            
            Consumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList("page_visits"));
    
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
            }
        }
            
    }
  • 相关阅读:
    java枚举enum
    冒泡排序、选择排序、插入排序、二分法排序、快速排序、二叉树排序、堆排序总结
    Django-tinymce富文本的使用
    Redis-基本操作总结
    git-总结大全
    css-总结
    html-table布局
    html表单示例
    html总结
    python-浅拷贝、深拷贝实例以及讲解
  • 原文地址:https://www.cnblogs.com/hd3013779515/p/6939009.html
Copyright © 2011-2022 走看看