zoukankan      html  css  js  c++  java
  • kafka_2.12-1.1.0 生产与消费java实现示例

    环境准备:

    1)需要在maven工程中引入依赖:

     1  <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
     2     <dependency>
     3       <groupId>org.apache.kafka</groupId>
     4       <artifactId>kafka_2.12</artifactId>
     5       <version>1.1.0</version>
     6     </dependency>
     7     <dependency>
     8       <groupId> org.apache.cassandra</groupId>
     9       <artifactId>cassandra-all</artifactId>
    10       <version>0.8.1</version>
    11 
    12       <exclusions>
    13         <exclusion>
    14           <groupId>org.slf4j</groupId>
    15           <artifactId>slf4j-log4j12</artifactId>
    16         </exclusion>
    17         <exclusion>
    18           <groupId>log4j</groupId>
    19           <artifactId>log4j</artifactId>
    20         </exclusion>
    21       </exclusions>
    22 
    23     </dependency>

    2)本机是否能telnet 192.178.0.111 9092(kafaka所部署的vmw虚拟机)通? 如果telnet端口不通,则需要关闭192.178.0.111的防火墙:

    systemctl stop firewalld.service #停止firewall
    systemctl disable firewalld.service #禁止firewall开机启动

    一、生产者

    首先看以下两种实现示例:

    package com.dx;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.producer.*;
    
    import java.util.Properties;
    import java.util.Random;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    import java.util.Date;
    
    public class ProducerTest {
        public static void main(String[] args) {
            producer_test1(args);
    
            producer_test2();
        }
    
        private static void producer_test2() {
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.178.0.111:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            Producer<String, String> producer = new KafkaProducer<String, String>(props);
            for(int i = 0; i < 10; i++)
                producer.send(new ProducerRecord<String, String>("kafakatopic", Integer.toString(i), Integer.toString(i)));
    
            producer.close();
        }
    
        private static void producer_test1(String[] args) {
            String arg0 = args != null && args.length > 0 ? args[0] : "10";
            long events = Long.parseLong(arg0);
            Random rnd = new Random();
    
            //    /opt/kafka_2.12-1.1.0/bin/kafka-console-producer.sh --broker-list 192.178.0.111:9092 --sync --topic kafkatopic
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.178.0.111:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            // 配置partitionner选择策略,可选配置
            props.put("partitioner.class", "com.dx.SimplePartitioner2");
    
            Producer<String, String> producer = new KafkaProducer<String, String>(props);
            for (long nEvents = 0; nEvents < events; nEvents++) {
                long runtime = new Date().getTime();
                String ip = "192.178.0." + rnd.nextInt(255);
                String msg = runtime + ",www.example.com," + ip;
                ProducerRecord<String, String> data = new ProducerRecord<String, String>("kafakatopic", ip, msg);
                Future<RecordMetadata> send = producer.send(data,
                        new Callback() {
                            public void onCompletion(RecordMetadata metadata, Exception e) {
                                if (e != null) {
                                    e.printStackTrace();
                                } else {
                                    System.out.println("The offset of the record we just sent is: " + metadata.offset());
                                }
                            }
                        });
            }
            producer.close();
        }
    }
    SimplePartitioner2.java
     1 package com.dx;
     2 
     3 import java.util.List;
     4 import java.util.Map;
     5 
     6 import org.apache.kafka.clients.producer.Partitioner;
     7 import org.apache.kafka.common.Cluster;
     8 import org.apache.kafka.common.PartitionInfo;
     9 
    10 public class SimplePartitioner2 implements Partitioner {
    11     public void configure(Map<String, ?> map) {
    12     }
    13 
    14     public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    15         int partition = 0;
    16         List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    17         int numPartitions = partitions.size();
    18         String stringKey = (String) key;
    19         int offset = stringKey.lastIndexOf('.');
    20         if (offset > 0) {
    21             partition = Integer.parseInt(stringKey.substring(offset + 1)) % numPartitions;
    22         }
    23 
    24         return partition;
    25     }
    26 
    27     public void close() {
    28     }
    29 }
    View Code

    参数设置备注:

    1)bootstrap.servers --设置生产者需要连接的kafka地址
    2)acks --回令类型
    3)retries --重试次数
    4)batch.size --批量提交大小
    5)linger.ms --提交延迟等待时间(等待时间内可以追加提交)
    6)buffer.memory --缓存大小
    7)key.serializer|value.serializer --序列化方法

    需要注意的有两点:
    1、acks回令。如果必须等待回令,那么设置acks为all;否则,设置为-1;等待回令会有性能损耗。
    2、生产者在发送消息的过程中,会自己默认批量提交。所以,如果单条指令的发送请求,记得发送完后flush才能生效。

    3、SimplePartitioner2.java为kafaka分区,可选项。

    二、消费者

    以下实现示例:

    package com.dx;
    
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import kafka.javaapi.consumer.ConsumerConnector;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.util.Properties;
    import java.util.Arrays;
    import java.util.Map;
    import java.util.HashMap;
    import java.util.List;
    
    /**
     * zk启动:sh /opt/zookeeper-3.4.11/bin/zkServer.sh start &
     * kafka启动:sh /opt/kafka_2.12-1.1.0/bin/kafka-server-start.sh /opt/kafka_2.12-1.1.0/config/server.properties &
     */
    public class ConsumerTest {
        public static void main(String[] args) {
            Properties props = new Properties();
    
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.178.0.111:9092");
            props.put(ConsumerConfig.GROUP_ID_CONFIG ,"test") ;
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
            consumer.subscribe(Arrays.asList("kafakatopic"));
    
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
            }
        }
    }

    三、测试

    先启动productor运行,之后在启动consumer运行。在consumer打印结果如下:

    offset = 200, key = 192.178.0.20, value = 1522587459181,www.example.com,192.178.0.20
    offset = 201, key = 192.178.0.143, value = 1522587459359,www.example.com,192.178.0.143
    offset = 202, key = 192.178.0.113, value = 1522587459359,www.example.com,192.178.0.113
    offset = 203, key = 192.178.0.110, value = 1522587459359,www.example.com,192.178.0.110
    offset = 204, key = 192.178.0.232, value = 1522587459359,www.example.com,192.178.0.232
    offset = 205, key = 192.178.0.96, value = 1522587459359,www.example.com,192.178.0.96
    offset = 206, key = 192.178.0.76, value = 1522587459360,www.example.com,192.178.0.76
    offset = 207, key = 192.178.0.78, value = 1522587459360,www.example.com,192.178.0.78
    offset = 208, key = 192.178.0.80, value = 1522587459360,www.example.com,192.178.0.80
    offset = 209, key = 192.178.0.177, value = 1522587459360,www.example.com,192.178.0.177
    offset = 210, key = 0, value = 0
    offset = 211, key = 1, value = 1
    offset = 212, key = 2, value = 2
    offset = 213, key = 3, value = 3
    offset = 214, key = 4, value = 4
    offset = 215, key = 5, value = 5
    offset = 216, key = 6, value = 6
    offset = 217, key = 7, value = 7
    offset = 218, key = 8, value = 8
    offset = 219, key = 9, value = 9
  • 相关阅读:
    尽管以C++为基础,但 Java 是一种更纯粹的面向对象程序设计语言
    Java 还是 C++?
    计划的回报
    阶段4:校订
    阶段3:开始创建
    阶段2:如何构建?
    阶段1:要制作什么?
    阶段0:拟出一个计划
    不要迷失
    分析和设计
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/8688289.html
Copyright © 2011-2022 走看看