zoukankan      html  css  js  c++  java
  • Java代码测试Kafka集群收发消息

    consumer:

    package cn.miaoying.consumer;
    
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.serializer.StringDecoder;
    import kafka.utils.VerifiableProperties;
    public class TestConsumer {
        private final ConsumerConnector consumer;
    
        private TestConsumer() {
            Properties props = new Properties();
            // zookeeper 配置
            props.put("zookeeper.connect", "20.21.1.xxx:2182,20.21.1.xxx:2183,20.21.1.xxx:2184");
            // group 代表一个消费组
            props.put("group.id", "jd-group");
            // zk连接超时
            props.put("zookeeper.session.timeout.ms", "40000");
            props.put("zookeeper.sync.time.ms", "200");
            props.put("auto.commit.interval.ms", "1000");
            props.put("auto.offset.reset", "smallest");
            // 序列化类
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            ConsumerConfig config = new ConsumerConfig(props);
            consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
        }
    
        void consume() {
            Map topicCountMap = new HashMap();
            //topicCountMap.put(KafkaProducerDemo.TOPIC, new Integer(1));
            topicCountMap.put("test_miaoying", new Integer(1));
    
            StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
            StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
    
            Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(topicCountMap,
                    keyDecoder, valueDecoder);
            KafkaStream stream = consumerMap.get("test_miaoying").get(0);
            ConsumerIterator it = stream.iterator();
            while (it.hasNext())
                System.out.println(it.next().message());
        }
    
        public static void main(String[] args) {
            new TestConsumer().consume();
        }
    }

    provider:

    package cn.miaoying.consumer;
    
    import java.util.Date;
    import java.util.Properties;
    
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    
    public class TestProvider {
        public static void main(String[] args) {
    
            long events = Long.parseLong("1");
            Properties properties = new Properties();
            properties.put("metadata.broker.list", "20.21.1.xxx:9093,20.21.1.xxx:9091,20.21.1.xxx:9092");
            properties.put("serializer.class", "kafka.serializer.StringEncoder");
            properties.put("request.required.acks", "1");
            ProducerConfig config = new ProducerConfig(properties);
            Producer producer = new Producer(config);
            for (int i = 0; i < 5; i++) {
                long runtime = new Date().getTime();
                String ip = "127.0.0.1";
                String msg = "test~test~test";
                KeyedMessage keyedMessage = new KeyedMessage("test_miaoying", ip, msg);
                System.out.println(events + "---" + runtime);
                producer.send(keyedMessage);
            }
            producer.close();
        }
    }
  • 相关阅读:
    JAVA程序员面试32问
    在做物流的库存管理系统里,需要注意。。。。。
    在写自动更新程序中出现的问题
    数据库设计中的五个范式
    cPickle / pickle
    python总结1
    python总结2
    汉明距离(Hamming distance)
    python中pickle的用法
    NET面试题
  • 原文地址:https://www.cnblogs.com/miaoying/p/14663934.html
Copyright © 2011-2022 走看看