zoukankan      html  css  js  c++  java
  • Java操作RocketMQ

    第一步:导入依赖

     <dependency>
          <groupId>com.alibaba.rocketmq</groupId>
          <artifactId>rocketmq-client</artifactId>
          <version>3.0.10</version>
        </dependency>
        <dependency>
          <groupId>com.alibaba.rocketmq</groupId>
          <artifactId>rocketmq-all</artifactId>
          <version>3.0.10</version>
          <type>pom</type>
        </dependency>
        <dependency>
          <groupId>ch.qos.logback</groupId>
          <artifactId>logback-classic</artifactId>
          <version>1.1.1</version>
        </dependency>
        <dependency>
          <groupId>ch.qos.logback</groupId>
          <artifactId>logback-core</artifactId>
          <version>1.1.1</version>
        </dependency>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.10</version>
          <scope>test</scope>
     </dependency>
    

      

    第二步:创建生产者

    package com.wish;
    
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    
    public class Producer {
    
        public static void main(String[] args) throws MQClientException {
            //创建一个消息的生产者
            // producerGroup:一般发送同样消息的Producer,归为同一个Group,应用必须设置,并保证命名唯一
            DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
            //设置名称srv地址
            producer.setNamesrvAddr("192.168.152.55:9876;192.168.152.66:9876");
            //实例名称
            producer.setInstanceName("producer");
            //启动
            producer.start();
            try {
                for (int i = 0; i < 10; i++) {
                    Thread.sleep(1000); // 每秒发送一次MQ
                    Message msg = new Message("itmayiedu-topic", // topic 主题名称
                            "TagA", // tag 临时值
                            ("itmayiedu-"+i).getBytes()// body 内容
                    );
                    //send()发送
                    SendResult sendResult = producer.send(msg);
                    //SendResult:发送消息结果
                    System.out.println(sendResult.toString());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            //关掉
            producer.shutdown();
        }
    
    }
    

      

    第三步:创建消费者

    package com.wish;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    public class Consumer {
        public static void main(String[] args) throws MQClientException {
            //创建一个消费者
            //consumerGroup:做同样事情的Consumer归为同一个Group,应用必须设置,并保证命名唯一
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
            //设置名称srv地址
            consumer.setNamesrvAddr("192.168.152.55:9876;192.168.152.66:9876");
            //实例名称
            consumer.setInstanceName("consumer");
            //实现订阅
            consumer.subscribe("itmayiedu-topic", "TagA");
            //注册消息监听器
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for (MessageExt msg : msgs) {
                        System.out.println(msg.getMsgId()+"---"+new String(msg.getBody()));
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            //启动消费者
            consumer.start();
            System.out.println("Consumer Started.");
        }
    }
    

      

    第四步:分别启动消费者和生产者,查看浏览器

  • 相关阅读:
    Chrome开发者工具详解(1)
    Chrome开发者工具详解(2)
    Ubuntu ADSL拨号上网
    Bash中单引号和双引号的区别
    建立菜单
    波浪号和Hyphen扩展
    标准IO和重定向
    Bash变量扩展修改符
    mysql主键约束和唯一性约束
    Here文档
  • 原文地址:https://www.cnblogs.com/wishsaber/p/12322200.html
Copyright © 2011-2022 走看看