zoukankan      html  css  js  c++  java
  • 3.java使用kafka

    创建项目

    配置pom.xml

         <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.11</artifactId>
                <version>${kafka.version}</version>
            </dependency>

    生产者

    package com.itheima.producer;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Properties;
    import java.util.concurrent.Future;
    
    /**
     * kafka客户端之:生产者
     */
    public class MyKafkaProducer {
    
        public static void main(String[] args) throws Exception{
            // 1.配置信息
            Properties props = new Properties();
            // 定义kafka服务器地址列表,不需要指定所有的broker
            props.put("bootstrap.servers", "server1:9092,server2:9092,server3:9092");
            //  生产者需要leader确认请求完成之前接收的应答数
            props.put("acks", "-1");
            // 客户端失败重试次数
            props.put("retries", 1);
            // 生产者打包消息的批量大小,以字节为单位.此处是16k
            props.put("batch.size", 16384);
            // 生产者延迟1ms发送消息
            props.put("linger.ms", 1);
            // 生产者缓存内存的大小,以字节为单位.此处是32m
            props.put("buffer.memory", 33554432);
            // key 序列化类
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            // value序列化类
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            // 2.创建生产者
            KafkaProducer<String,String> producer = new KafkaProducer<String, String>(props);
    
            // 3.生产数据
            /**
             * 发送消息的三种方式:
             *      1.同步阻塞发送
             *          适用场景:业务不需要高吞吐量、更关心消息发送的顺序、不允许消息发送失败
             *      2.异步发送(发送并忘记)
             *          适用场景:业务只关心吞吐量、不关心消息发送的顺序、可以允许消息发送失败
             *      3.异步发送(回调函数)
             *          适用场景:业务需要知道消息发送成功、不关心消息发送的顺序
             */
    
            // 1.同步阻塞发送
            // 创建消息
           /* System.out.println("-------------------同步发送消息......start-----------------------");
            ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima_topic",0,"key-sync","同步发送消息");
    
            Future<RecordMetadata> send = producer.send(record);
            RecordMetadata recordMetadata = send.get();
            System.out.println(recordMetadata);//itheima_topic-0@2
    
            System.out.println("-------------------同步发送消息......end-----------------------");*/
    
            // 2.异步发送(发送并忘记)
            // 创建消息
            /*System.out.println("-------------------异步发送(发送并忘记)......start-----------------------");
            ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima_topic",0,"key-async1","异步发送消息,发送并忘记");
    
            // 发送并忘记
            producer.send(record);
    
            System.out.println("-------------------异步发送(发送并忘记)......end-----------------------");
    
            // 刷新
            producer.flush();*/
    
            // 3.异步发送(回调函数)
            // 创建消息
            System.out.println("-------------------异步发送(回调函数)......start-----------------------");
            ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima_topic",0,"key-async2","异步发送消息,(回调函数)");
    
            // 发送,回调函数处理
            producer.send(record, new Callback() {
                // 处理回调业务逻辑
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    System.out.println("异步发送消息成功:"+recordMetadata);//itheima_topic-0@4
                    System.out.println("异常对象:"+e);//null
                }
            });
    
            System.out.println("-------------------异步发送(回调函数)......end-----------------------");
    
            // 刷新
            producer.flush();
    
        }
    }

    消费者

    package com.itheima.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    /**
     * kafka客户端之:消费者
     */
    public class MyKafkaConsumer {
    
        public static void main(String[] args) throws Exception{
            // 1.配置信息
            Properties props = new Properties();
            // 定义kafka服务器地址列表,不需要指定所有的broker
            props.put("bootstrap.servers", "server1:9092,server2:9092,server3:9092");
            // 消费者组id
            props.put("group.id", "itheima");
            // 是否自动确认offset
            props.put("enable.auto.commit", "true");
            //自动确认offset时间间隔
            props.put("auto.commit.interval.ms", "1000");
    
            // key 序列化类
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            // value序列化类
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            // 2.创建消费者
            KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);
    
            // 3.消费消息
            // 指定分区消费
            TopicPartition partition = new TopicPartition("itheima_topic",0);
    
            // 获取已经提交的偏移量
            long offset = 0L;
            OffsetAndMetadata offsetAndMetadata = consumer.committed(partition);
            if(offsetAndMetadata !=null){
                offset = offsetAndMetadata.offset();
            }
            System.out.println("当前消费的偏移量:"+offset);
    
            // 指定偏移量消费
            consumer.assign(Arrays.asList(partition));
            consumer.seek(partition,offset);
    
            // 循环拉取数据
            while (true){
               // 拉取数据
                ConsumerRecords<String, String> records = consumer.poll(1000);
    
                // 打印数据
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("消费的数据为:" + record.value());
                }
    
            }
        }
    }
  • 相关阅读:
    Ext JS 6开发实例(三) :主界面设计
    Ext JS 6开发实例(二) :使用CMD创建应用程序
    文件夹或者文件比对工具 Beyond Compare
    LIS问题(DP解法)---poj1631(模板)
    hdoj Max Sum Plus Plus(DP)
    A* 算法详解
    hdoj1043 Eight(逆向BFS+打表+康拓展开)
    hdoj2612 Find a way (bfs)
    luoguP3366 [模板] 最小生成树
    luoguP1196(带权并查集)
  • 原文地址:https://www.cnblogs.com/luzhanshi/p/13376974.html
Copyright © 2011-2022 走看看