zoukankan      html  css  js  c++  java
  • java操作RocketMQ

    创建工程(Producer和Consumer)

      

    导入依赖

    复制代码
         <dependency>
                <groupId>com.alibaba.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>3.0.10</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba.rocketmq</groupId>
                <artifactId>rocketmq-all</artifactId>
                <version>3.0.10</version>
                <type>pom</type>
            </dependency>
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-classic</artifactId>
                <version>1.1.1</version>
            </dependency>
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-core</artifactId>
                <version>1.1.1</version>
            </dependency>
    复制代码

    创建生产者

    复制代码
    package com.wn.producer;
    
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    
    public class MQProducer {
        public static void main(String[] args) throws MQClientException {
            DefaultMQProducer producer=new DefaultMQProducer("rmq-group");
            producer.setNamesrvAddr("192.168.138.187:9876;192.168.138.188:9876");
            producer.setInstanceName("producer");
            producer.start();
            try {
                for (int i=0;i<10;i++){
                    Thread.sleep(1000); //每秒发送一次
                    Message msg = new Message("itmayiedu-topic", // topic 主题名称
                            "TagA", // tag 临时值
                            ("itmayiedu-"+i).getBytes()// body 内容
                    );
                    SendResult sendResult=producer.send(msg);
                    System.out.println(sendResult.toString());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            producer.shutdown();
        }
    
    }
    复制代码

    创建消费者

    复制代码
    package com.wn.consumer;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    public class MQConsumer {
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group");
            consumer.setNamesrvAddr("192.168.138.187:9876;192.168.138.188:9876");
            consumer.setInstanceName("consumer");
            consumer.subscribe("itmayiedu-topic","TagA");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    for (MessageExt msg:list){
                        System.out.println(msg.getMsgId()+"---"+new String(msg.getBody()));
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.println("Consumer Started...");
    
        }
    }
    复制代码

     实现效果

      执行producer和consumer

        producer

          

        consumer

          

      列表中的信息如下:

        

  • 相关阅读:
    python zip()与zip(*ziped)以及list(zip(a,b))
    通信原理(第七版)-樊昌信-第一章-绪论-重要知识点
    通信原理-自相关与互相关函数的关系
    通信原理(第七版)-樊昌信-第二章-确知信号-重要知识点
    C#Linq的10个练习
    C#从委托、lambda表达式到linq总结
    C#的隐式类型、匿名类型、自动属性、初始化器
    MVC开发之Razor的使用
    Markdown常用语法
    MVC开发之注入容器Ninject的使用
  • 原文地址:https://www.cnblogs.com/mayuan01/p/12391507.html
Copyright © 2011-2022 走看看