zoukankan      html  css  js  c++  java
  • Java操作RocketMQ

    第一步:导入依赖

     <dependency>
          <groupId>com.alibaba.rocketmq</groupId>
          <artifactId>rocketmq-client</artifactId>
          <version>3.0.10</version>
        </dependency>
        <dependency>
          <groupId>com.alibaba.rocketmq</groupId>
          <artifactId>rocketmq-all</artifactId>
          <version>3.0.10</version>
          <type>pom</type>
        </dependency>
        <dependency>
          <groupId>ch.qos.logback</groupId>
          <artifactId>logback-classic</artifactId>
          <version>1.1.1</version>
        </dependency>
        <dependency>
          <groupId>ch.qos.logback</groupId>
          <artifactId>logback-core</artifactId>
          <version>1.1.1</version>
        </dependency>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.10</version>
          <scope>test</scope>
     </dependency>
    

      

    第二步:创建生产者

    package com.wish;
    
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    
    public class Producer {
    
        public static void main(String[] args) throws MQClientException {
            //创建一个消息的生产者
            // producerGroup:一般发送同样消息的Producer,归为同一个Group,应用必须设置,并保证命名唯一
            DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
            //设置名称srv地址
            producer.setNamesrvAddr("192.168.152.55:9876;192.168.152.66:9876");
            //实例名称
            producer.setInstanceName("producer");
            //启动
            producer.start();
            try {
                for (int i = 0; i < 10; i++) {
                    Thread.sleep(1000); // 每秒发送一次MQ
                    Message msg = new Message("itmayiedu-topic", // topic 主题名称
                            "TagA", // tag 临时值
                            ("itmayiedu-"+i).getBytes()// body 内容
                    );
                    //send()发送
                    SendResult sendResult = producer.send(msg);
                    //SendResult:发送消息结果
                    System.out.println(sendResult.toString());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            //关掉
            producer.shutdown();
        }
    
    }
    

      

    第三步:创建消费者

    package com.wish;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    public class Consumer {
        public static void main(String[] args) throws MQClientException {
            //创建一个消费者
            //consumerGroup:做同样事情的Consumer归为同一个Group,应用必须设置,并保证命名唯一
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
            //设置名称srv地址
            consumer.setNamesrvAddr("192.168.152.55:9876;192.168.152.66:9876");
            //实例名称
            consumer.setInstanceName("consumer");
            //实现订阅
            consumer.subscribe("itmayiedu-topic", "TagA");
            //注册消息监听器
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for (MessageExt msg : msgs) {
                        System.out.println(msg.getMsgId()+"---"+new String(msg.getBody()));
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            //启动消费者
            consumer.start();
            System.out.println("Consumer Started.");
        }
    }
    

      

    第四步:分别启动消费者和生产者,查看浏览器

  • 相关阅读:
    C# @符号的多种使用方法
    C#抽象工厂简单实现类
    项目总结——谈谈封装(抽象工厂+反射+缓存机制)
    C#中String类的几个方法(IndexOf、LastIndexOf、Substring)
    String str 与 String str=new String("") 区别
    sql面试题
    [bzoj2038]莫队算法学习
    分治算法初步
    线段树训练
    [poj2104]可持久化线段树入门题(主席树)
  • 原文地址:https://www.cnblogs.com/wishsaber/p/12322200.html
Copyright © 2011-2022 走看看