zoukankan      html  css  js  c++  java
  • 生产者与消费者的Java实现

    首先创建maven工程,需要引入的包:

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.2.1</version>
        </dependency>
        <dependency>  
            <groupId>org.apache.kafka</groupId>  
            <artifactId>kafka_2.10</artifactId>  
            <version>0.10.2.1</version>  
        </dependency>  
    </dependencies>

    然后就可以实现生产者与消费者,在创建topic时,如果需要删除已经存在的topic,则需要配置delete.topic.enable=true,否则无法删除对应的topic。

    /**
    消费者
    **/
    public
    class KafkaConsumerDemo { private final KafkaConsumer<String, String> consumer; private KafkaConsumerDemo(){ Properties props = new Properties(); props.put("bootstrap.servers", "10.xxx.xxx.149:9092, 10.xxx.xxx.182:9092, 10:xxx.xxx.190:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<String, String>(props); } void consume(){ consumer.subscribe(Arrays.asList(KafkaProducerDemo.TOPIC)); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } public static void main(String[] args){ new KafkaConsumerDemo().consume(); } }
    /**
     * 生成者
     *
     */
    public class KafkaProducerDemo
    {
        private final Producer<String, String> kafkaProducer;
        
        public final static String TOPIC = "JAVA_TOPIC";
        
        private KafkaProducerDemo()
        {
            kafkaProducer = createKafkaProducer();
        }
        
        private Producer<String, String> createKafkaProducer()
        {
            Properties props = new Properties();
            props.put("bootstrap.servers", "10.185.156.149:9092, 10.185.156.182:9092, 10:185.156.190:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            
            Producer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
            return kafkaProducer;
        }
        
        void produce()
        {
            for (int i = 1; i < 1000; i++)
            {
                try
                {
                    Thread.sleep(1000);
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
                String key = String.valueOf("key" + i);
                String data = "hello kafka message:" + key;
                kafkaProducer.send(new ProducerRecord<>(TOPIC, key, data), new Callback()
                {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e)
                    {
                        // do sth
                    }
                });
                System.out.println(data);
            }
        }
        
        public static void main(String[] args)
        {
            KafkaCreateTopic.createTopic("JAVA_TOPIC", 3, 1);
            new KafkaProducerDemo().produce();
        }
    }
    /**
    创建topic
    **/
    public
    class KafkaCreateTopic { public static void createTopic(String topic, int partitions, int replicationFactor) { ZkUtils zkUtils = ZkUtils.apply("10.xxx.xxx.149:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled()); if (AdminUtils.topicExists(zkUtils, topic)) { deleteTopic(zkUtils, topic); } AdminUtils.createTopic(zkUtils, topic, partitions, replicationFactor, new Properties(), RackAwareMode.Enforced$.MODULE$); zkUtils.close(); } public static void deleteTopic(ZkUtils zkUtils, String topic) { AdminUtils.deleteTopic(zkUtils, topic); System.out.println("delete the topic " + topic); } }
  • 相关阅读:
    css03层次选择器
    css02基本选择器
    Distance Between Points
    CloseHandle(IntPtr handle)抛异常
    关于win7上内存占用较大的说明
    C# WPF 显示图片和视频显示 EmuguCv、AForge.Net测试(续)
    五种开源协议的比较(BSD_Apache_GPL_LGPL_MIT)
    C# WPF 显示图片和视频显示 EmuguCv、AForge.Net测试
    Opencv不用每次创建项目配置vs2010 vc++目录 库目录等项
    矩阵运算
  • 原文地址:https://www.cnblogs.com/woniu4/p/8516390.html
Copyright © 2011-2022 走看看