zoukankan      html  css  js  c++  java
  • java向linux的kafka发送消息 并接收消息实例

    1.首先要关闭linux系统上的防火墙

    service iptables status可以查看到iptables服务的当前状态。

    在此说一下关于启动和关闭防火墙的命令:
    1) 重启后生效
    开启: chkconfig iptables on
    关闭: chkconfig iptables off
    2) 即时生效,重启后失效
    开启: service iptables start
    关闭: service iptables stop

    2.开启zookeeper服务和kafka服务,在之前的随笔中有

    3.开启eclipse,添加好jar包,弄好环境,kafka开发需要的jar包列表自己百度,也可以用maven管理

    4.写producer类

    package kafka;
    
    import java.util.Properties;
    import java.util.concurrent.TimeUnit;
    
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    import kafka.serializer.StringEncoder;
    
    public class kafkaProducer extends Thread{
        private String topic;
        public kafkaProducer(String topic){
            super();
            this.topic = topic;
        }
        public static void main(String[] args) {
            new kafkaProducer("mytopic").start();
        }
        @Override
        public void run() {
            Producer producer =createProducer();
            int i = 0;
            while(true){
                producer.send(new KeyedMessage<Integer, String>(topic, "message:"+i++));;
                System.out.println("发送成功!");
                try{
                    TimeUnit.SECONDS.sleep(1);
                }catch(InterruptedException e){
                    e.printStackTrace();
                }
            }
        }
        private Producer createProducer(){
            Properties properties = new Properties();
            properties.put("zk.connect", "xx.xx.xx.xx:2181");
            properties.put("serializer.class",StringEncoder.class.getName());
            properties.put("metadata.broker.list","xx.xx.xx.xx:9092");
            return new Producer<Integer, String>(new ProducerConfig(properties));
        }
    }

    写完这个之后运行,然后在linux系统上运行消费者就可以看到发送出去的消息了

    [root@bogon kafka_2.9.2-0.8.1.1]# bin/kafka-console-consumer.sh --zookeeper bogon:2181 --topic mytopic --from-beginning

    生产者OK了

    5.写consumer类

    package kafka;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    
    public class kafkaConsumer extends Thread{
        private String topic;
        public kafkaConsumer(String topic){
            super();
            this.topic =topic;
        }
        public static void main(String[] args) {
            new kafkaConsumer("mytopic").start();
        }
        @Override
        public void run() {
            ConsumerConnector consumer = createConsumer();
            Map<String,Integer> topicCountMap = new HashMap<String,Integer>();
            topicCountMap.put(topic, 1);
            Map<String,List<KafkaStream<byte[],byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
            KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);
            ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
            while(iterator.hasNext()){
                String message = new String(iterator.next().message());
                System.out.println("get:"+message);
            }
        }
        private ConsumerConnector createConsumer(){
            Properties properties = new Properties();
            properties.put("zookeeper.connect", "xx.xx.xx.xx:2181");
            properties.put("group.id", "0");
            return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
        }
        
    }

    上面的group.id随便写都可以,亲测。

    运行起生产者,然后再运行消费者就可以在控制台看到输出的消息和接收到的消息了。

    一个菜鸟程序媛
  • 相关阅读:
    城市的划入划出效果
    文本溢出省略解决笔记css
    长串英文数字强制折行解决办法css
    Poj 2352 Star
    树状数组(Binary Indexed Trees,二分索引树)
    二叉树的层次遍历
    Uva 107 The Cat in the Hat
    Uva 10336 Rank the Languages
    Uva 536 Tree Recovery
    Uva10701 Pre, in and post
  • 原文地址:https://www.cnblogs.com/yovela/p/5201000.html
Copyright © 2011-2022 走看看