zoukankan      html  css  js  c++  java
  • Kafka发布订阅消息

    Maven

    	<dependency>
        		<groupId>org.apache.kafka</groupId>
        		<artifactId>kafka-clients</artifactId>
        		<version>0.11.0.0</version>
        	</dependency>
       	    <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-streams</artifactId>
                <version>0.11.0.0</version>
        	</dependency>
    

    生产者Producer

    import java.util.Properties;
     
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
     
    public class ProducerDemo {
     
        private final KafkaProducer<String, String> producer;
     
        public final static String TOPIC = "test5";
     
        private ProducerDemo() {
            Properties props = new Properties();
            props.put("bootstrap.servers", "xxx:9092,1xxx:9092,xxx:9092");//xxx服务器ip
            props.put("acks", "all");//所有follower都响应了才认为消息提交成功,即"committed"
            props.put("retries", 0);//retries = MAX 无限重试,直到你意识到出现了问题:)
            props.put("batch.size", 16384);//producer将试图批处理消息记录,以减少请求次数.默认的批量处理消息字节数
            //batch.size当批量的数据大小达到设定值后,就会立即发送,不顾下面的linger.ms
            props.put("linger.ms", 1);//延迟1ms发送,这项设置将通过增加小的延迟来完成--即,不是立即发送一条记录,producer将会等待给定的延迟时间以允许其他消息记录发送,这些消息记录可以批量处理
            props.put("buffer.memory", 33554432);//producer可以用来缓存数据的内存大小。
            props.put("key.serializer",
                    "org.apache.kafka.common.serialization.IntegerSerializer");
            props.put("value.serializer",
                  "org.apache.kafka.common.serialization.StringSerializer");
     
            producer = new KafkaProducer<String, String>(props);
        }
     
        public void produce() {
            int messageNo = 1;
            final int COUNT = 5;
     
            while(messageNo < COUNT) {
                String key = String.valueOf(messageNo);
                String data = String.format("hello KafkaProducer message %s from hubo 06291018 ", key);
                
                try {
                    producer.send(new ProducerRecord<String, String>(TOPIC, data));
                } catch (Exception e) {
                    e.printStackTrace();
                }
     
                messageNo++;
            }
            
            producer.close();
        }
     
        public static void main(String[] args) {
            new ProducerDemo().produce();
        }
    }
    

    消费者Consumer

    import java.util.Arrays;
    import java.util.Properties;
     
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
     
     
    public class UserKafkaConsumer extends Thread {
     
            public static void main(String[] args){
                Properties properties = new Properties();
                properties.put("bootstrap.servers", "xxx:9092,xxx:9092,xxx:9092");//xxx是服务器集群的ip
                properties.put("group.id", "jd-group");
                properties.put("enable.auto.commit", "true");
                properties.put("auto.commit.interval.ms", "1000");
                properties.put("auto.offset.reset", "latest");
                properties.put("session.timeout.ms", "30000");
                properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     
                KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
                kafkaConsumer.subscribe(Arrays.asList("test5"));
                while (true) {
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.println("-----------------");
                        System.out.printf("offset = %d, value = %s", record.offset(), record.value());
                        System.out.println();
                    }
                }
     
            }
    }
  • 相关阅读:
    利用KINECT+OPENCV检测手势的演示程序
    关于PPC软件的开发库
    FlashGet的IE加载项与IE8不兼容
    ubuntu9.04 安装字体 后记
    【pys60笔记】中文
    【pys60学习笔记】S60 模拟器使用。
    ubuntu9.04 安装字体
    IDE硬盘安装Windows 7 7106(光驱与硬盘共用一个IDE)
    易歌——web在线听歌桌面程序。带全局键控制。
    MapX直连Oracle——MapX5配合Oracle时,对中文表名支持不好
  • 原文地址:https://www.cnblogs.com/kaleidoscope/p/9768747.html
Copyright © 2011-2022 走看看