zoukankan      html  css  js  c++  java
  • java向linux的kafka发送消息 并接收消息实例

    1.首先要关闭linux系统上的防火墙

    service iptables status可以查看到iptables服务的当前状态。

    在此说一下关于启动和关闭防火墙的命令:
    1) 重启后生效
    开启: chkconfig iptables on
    关闭: chkconfig iptables off
    2) 即时生效,重启后失效
    开启: service iptables start
    关闭: service iptables stop

    2.开启zookeeper服务和kafka服务,在之前的随笔中有

    3.开启eclipse,添加好jar包,弄好环境,kafka开发需要的jar包列表自己百度,也可以用maven管理

    4.写producer类

    package kafka;
    
    import java.util.Properties;
    import java.util.concurrent.TimeUnit;
    
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    import kafka.serializer.StringEncoder;
    
    public class kafkaProducer extends Thread{
        private String topic;
        public kafkaProducer(String topic){
            super();
            this.topic = topic;
        }
        public static void main(String[] args) {
            new kafkaProducer("mytopic").start();
        }
        @Override
        public void run() {
            Producer producer =createProducer();
            int i = 0;
            while(true){
                producer.send(new KeyedMessage<Integer, String>(topic, "message:"+i++));;
                System.out.println("发送成功!");
                try{
                    TimeUnit.SECONDS.sleep(1);
                }catch(InterruptedException e){
                    e.printStackTrace();
                }
            }
        }
        private Producer createProducer(){
            Properties properties = new Properties();
            properties.put("zk.connect", "xx.xx.xx.xx:2181");
            properties.put("serializer.class",StringEncoder.class.getName());
            properties.put("metadata.broker.list","xx.xx.xx.xx:9092");
            return new Producer<Integer, String>(new ProducerConfig(properties));
        }
    }

    写完这个之后运行,然后在linux系统上运行消费者就可以看到发送出去的消息了

    [root@bogon kafka_2.9.2-0.8.1.1]# bin/kafka-console-consumer.sh --zookeeper bogon:2181 --topic mytopic --from-beginning

    生产者OK了

    5.写consumer类

    package kafka;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    
    public class kafkaConsumer extends Thread{
        private String topic;
        public kafkaConsumer(String topic){
            super();
            this.topic =topic;
        }
        public static void main(String[] args) {
            new kafkaConsumer("mytopic").start();
        }
        @Override
        public void run() {
            ConsumerConnector consumer = createConsumer();
            Map<String,Integer> topicCountMap = new HashMap<String,Integer>();
            topicCountMap.put(topic, 1);
            Map<String,List<KafkaStream<byte[],byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
            KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);
            ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
            while(iterator.hasNext()){
                String message = new String(iterator.next().message());
                System.out.println("get:"+message);
            }
        }
        private ConsumerConnector createConsumer(){
            Properties properties = new Properties();
            properties.put("zookeeper.connect", "xx.xx.xx.xx:2181");
            properties.put("group.id", "0");
            return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
        }
        
    }

    上面的group.id随便写都可以,亲测。

    运行起生产者,然后再运行消费者就可以在控制台看到输出的消息和接收到的消息了。

    一个菜鸟程序媛
  • 相关阅读:
    linux转换win下乱码txt命令
    linux下vi命令大全详细版本
    ubuntu系统如何安装adb调试环境
    LeetCode136---只出现一次的数字
    微信发朋友圈--用例设计(转)
    微服务
    LeetCode1---两数之和
    python输出
    爬楼梯,N级楼梯有多少种走法?
    list数组排序---stream
  • 原文地址:https://www.cnblogs.com/yovela/p/5201000.html
Copyright © 2011-2022 走看看