zoukankan      html  css  js  c++  java
  • kafka的Java客户端示例代码(kafka_2.12-0.10.2.1)

    使用0.9开始增加的KafkaProducer和KafkaConsumer。

    Pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>cn.ljh.kafka</groupId>
      <artifactId>kafka-helloworld</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <packaging>jar</packaging>
    
      <name>kafka-helloworld</name>
      <url>http://maven.apache.org</url>
    
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.12</version>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka_2.12</artifactId>
          <version>0.10.2.1</version>
          <scope>compile</scope>
          <exclusions>
            <exclusion>
              <artifactId>jmxri</artifactId>
              <groupId>com.sun.jmx</groupId>
            </exclusion>
            <exclusion>
              <artifactId>jms</artifactId>
              <groupId>javax.jms</groupId>
            </exclusion>
            <exclusion>
              <artifactId>jmxtools</artifactId>
              <groupId>com.sun.jdmk</groupId>
            </exclusion>
          </exclusions>
        </dependency>
      </dependencies>
    </project>

    HelloWorldProducer2.java

    package cn.ljh.kafka.kafka_helloworld;
    
    import java.util.Date;
    import java.util.Properties;
    import java.util.Random;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    public class HelloWorldProducer2 {
        public static void main(String[] args) {
             long events = Long.parseLong(args[0]);
             Random rnd = new Random();
        
             Properties props = new Properties();
             props.put("bootstrap.servers", "192.168.137.176:9092,192.168.137.176:9093,192.168.137.176:9094");
             props.put("acks", "all");
             props.put("retries", 0);
             props.put("batch.size", 16384);
             props.put("linger.ms", 1);
             props.put("buffer.memory", 33554432);
             props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
             props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
             //配置partitionner选择策略,可选配置
             props.put("partitioner.class", "cn.ljh.kafka.kafka_helloworld.SimplePartitioner2");
        
             Producer<String, String> producer = new KafkaProducer<>(props);
        
             for (long nEvents = 0; nEvents < events; nEvents++) { 
                    long runtime = new Date().getTime();  
                    String ip = "192.168.2." + rnd.nextInt(255); 
                    String msg = runtime + ",www.example.com," + ip; 
                    ProducerRecord<String, String> data = new ProducerRecord<String, String>("page_visits", ip, msg);
                    producer.send(data,
                             new Callback() {
                         public void onCompletion(RecordMetadata metadata, Exception e) {
                             if(e != null) {
                                e.printStackTrace();
                             } else {
                                System.out.println("The offset of the record we just sent is: " + metadata.offset());
                             }
                         }
                     });
             }
             producer.close();
        }
    }

    SimplePartitioner2.java

    package cn.ljh.kafka.kafka_helloworld;
    
    import java.util.List;
    import java.util.Map;
    
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    import org.apache.kafka.common.PartitionInfo;
    
     
    public class SimplePartitioner2 implements Partitioner {
     
        @Override
        public void configure(Map<String, ?> configs) {
            // TODO Auto-generated method stub
            
        }
    
        @Override
        public int partition(String topic, Object key, byte[] keyBytes,
                Object value, byte[] valueBytes, Cluster cluster) {
                int partition = 0;
                 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
                int numPartitions = partitions.size();
                String stringKey = (String) key;
                int offset = stringKey.lastIndexOf('.');
                if (offset > 0) {
                   partition = Integer.parseInt( stringKey.substring(offset+1)) % numPartitions;
                }
                
                return partition;
        }
    
        @Override
        public void close() {
            // TODO Auto-generated method stub
            
        }
     
    }

    HelloWorldConsumer2.java

    package cn.ljh.kafka.kafka_helloworld;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    public class HelloWorldConsumer2 {
            
        public static void main(String[] args) {
            Properties props = new Properties();
    
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.176:9092,192.168.137.176:9093,192.168.137.176:9094");
            props.put(ConsumerConfig.GROUP_ID_CONFIG ,"test") ;
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            
            Consumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList("page_visits"));
    
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
            }
        }
            
    }
  • 相关阅读:
    10. Regular Expression Matching
    9. Palindrome Number
    6. ZigZag Conversion
    5. Longest Palindromic Substring
    4. Median of Two Sorted Arrays
    3. Longest Substring Without Repeating Characters
    2. Add Two Numbers
    链式表的按序号查找
    可持久化线段树——区间更新hdu4348
    主席树——树链上第k大spoj COT
  • 原文地址:https://www.cnblogs.com/hd3013779515/p/6939009.html
Copyright © 2011-2022 走看看