zoukankan      html  css  js  c++  java
  • (二)RocketMq入门之消息发送和接收

    一、消息产生、发送

     1 public class Producer {
     2 public static void main(String[] args) throws MQClientException {
     3   DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
     4   producer.setNamesrvAddr("172.18.4.114:9876");
     5   producer.setInstanceName("producer");
     6   producer.start();
     7   try {
     8     for (int i = 0; i < 10; i++) {
     9     Thread.sleep(5000); //每5秒发送一次MQ
    10     Message msg = new Message("TopicA-test",// topic
    11       "TagA",// tag
    12       (new Date() + " Hello RocketMQ ,QuickStart" + i)
    13       .getBytes()// body
    14       );
    15     SendResult sendResult = producer.send(msg);
    16     }
    17   } catch (Exception e) {
    18     e.printStackTrace();
    19   }
    20   producer.shutdown();
    21   }
    22 }

    二、消息接收、消费

     1 import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
     2 import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
     3 import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
     4 import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
     5 import com.alibaba.rocketmq.client.exception.MQClientException;
     6 import com.alibaba.rocketmq.common.message.MessageExt;
     7 
     8 import java.util.List;
     9 
    10 
    11 public class Consumer {
    12     public static void main(String[] args) throws MQClientException {
    13         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
    14 
    15         consumer.setNamesrvAddr("172.18.4.114:9876");
    16         consumer.setInstanceName("consumer");
    17         consumer.subscribe("TopicA-test", "TagA");
    18 
    19         consumer.registerMessageListener(new MessageListenerConcurrently() {
    20                 @Override
    21                 public ConsumeConcurrentlyStatus consumeMessage(
    22                     List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    23                     for (MessageExt msg : msgs) {
    24                         System.out.println(new String(msg.getTopic()));
    25                         System.out.println(new String(msg.getTags()));
    26                         System.out.println("=== " + new String(msg.getBody()));
    27                     }
    28 
    29                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    30                 }
    31             });
    32         consumer.start();
    33         System.out.println("Consumer Started.");
    34     }
    35 }



  • 相关阅读:
    vbs获取当月的第一天和最后一天的日期
    vbscript基础篇
    win10专业版激活
    python selenium中Excel数据维护
    python里面的xlrd模块详解
    python 转换为json时候 汉字编码问题
    用VBA得到EXCEL表格中的行数和列数
    表关联关系,表的复制
    存储引擎,详细建表语句,数据类型,约束
    数据库基础
  • 原文地址:https://www.cnblogs.com/yoyotl/p/6993787.html
Copyright © 2011-2022 走看看