zoukankan      html  css  js  c++  java
  • RocketMQ之简单使用

    在之前我们是使用 RocketMQ 自带的程序来验证功能,今天我们自己实现下消息的生产和消费。

    一、简单使用

    1.1 引入依赖:

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <!--和安装的 MQ 版本一致-->
        <version>4.7.0</version>
    </dependency>
    

    1.2 新建配置类:

    public class RocketMQConfig {
        // 服务器地址
        public static final String NAME_SERVER = "192.168.137.47:9876";
    }
    

    1.3 新建消费者:

    public class Producer {
    
        public static void main(String[] args) throws MQClientException, RemotingException,
                InterruptedException, MQBrokerException {
            // 创建生产者对象,指明了生产者组名
            DefaultMQProducer producer = new DefaultMQProducer("simple");
            // 设置服务器地址
            producer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
            // 启动实例
            producer.start();
    
            for (int i = 0; i < 3; i++) {
                String str = "Hello RocketMQ";
                // 实例化消息对象
                Message message = new Message("topicTest", "tagA", (str + i).getBytes());
                // 发送消息
                SendResult sendResult = producer.send(message);
                System.out.printf("%s%n", sendResult);
            }
            // 关闭生产者
            producer.shutdown();
        }
    
    }
    

    1.4 新建消费者:

    public class Consumer {
    
        public static void main(String[] args) throws MQClientException {
            // 创建消费者对象,指明了消费者组名
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("simple");
            // 设置服务器地址
            consumer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
            // 订阅指定主题
            consumer.subscribe("topicTest","*");
            // 注册消息监听事件
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    
                    System.out.println("msg:"+msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            // 启动消费者
            consumer.start();
        }
    
    }
    

    1.5 运行生产者类,查看控制台输出:

    SendResult [sendStatus=SEND_OK, msgId=0A79FA0D6E9418B4AAC269CDF61B0000, offsetMsgId=C0A8892F00002A9F00000000000283C3, messageQueue=MessageQueue [topic=topicTest, brokerName=localhost, queueId=1], queueOffset=6]
    SendResult [sendStatus=SEND_OK, msgId=0A79FA0D6E9418B4AAC269CDF6260001, offsetMsgId=C0A8892F00002A9F0000000000028474, messageQueue=MessageQueue [topic=topicTest, brokerName=localhost, queueId=2], queueOffset=8]
    SendResult [sendStatus=SEND_OK, msgId=0A79FA0D6E9418B4AAC269CDF6290002, offsetMsgId=C0A8892F00002A9F0000000000028525, messageQueue=MessageQueue [topic=topicTest, brokerName=localhost, queueId=3], queueOffset=9]
    

    1.6 运行消费者,查看控制台输出:

    msg:[MessageExt [brokerName=localhost, queueId=1, storeSize=177, queueOffset=6, sysFlag=0, bornTimestamp=1584767105564, bornHost=/192.168.137.1:41420, storeTimestamp=1584767105616, storeHost=/192.168.137.47:10911, msgId=C0A8892F00002A9F00000000000283C3, commitLogOffset=164803, bodyCRC=705268097, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='topicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=7, CONSUME_START_TIME=1584767105572, UNIQ_KEY=0A79FA0D6E9418B4AAC269CDF61B0000, WAIT=true, TAGS=tagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 48], transactionId='null'}]]
    msg:[MessageExt [brokerName=localhost, queueId=2, storeSize=177, queueOffset=8, sysFlag=0, bornTimestamp=1584767105574, bornHost=/192.168.137.1:41420, storeTimestamp=1584767105622, storeHost=/192.168.137.47:10911, msgId=C0A8892F00002A9F0000000000028474, commitLogOffset=164980, bodyCRC=1561245975, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='topicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=9, CONSUME_START_TIME=1584767105577, UNIQ_KEY=0A79FA0D6E9418B4AAC269CDF6260001, WAIT=true, TAGS=tagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 49], transactionId='null'}]]
    msg:[MessageExt [brokerName=localhost, queueId=3, storeSize=177, queueOffset=9, sysFlag=0, bornTimestamp=1584767105577, bornHost=/192.168.137.1:41420, storeTimestamp=1584767105625, storeHost=/192.168.137.47:10911, msgId=C0A8892F00002A9F0000000000028525, commitLogOffset=165157, bodyCRC=1141369005, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='topicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=10, CONSUME_START_TIME=1584767105585, UNIQ_KEY=0A79FA0D6E9418B4AAC269CDF6290002, WAIT=true, TAGS=tagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 50], transactionId='null'}]]
    
  • 相关阅读:
    This iPhone 6s is running iOS 11.3.1 (15E302), which may not be supported by this version of Xcode.
    vmware 里MAC 鼠标能移动 无法单击
    php获取微信的openid
    PHP 调试打印输出变量
    E0264 Unable to execute '"/usr/bin/codesign" ...'
    PHP 返回JSON
    小米手机安装证书
    CSS3:radial-gradient,径向渐变的使用方法
    CSS3:linear-gradient,线性渐变的使用方法
    CSS3:RGBA的使用方法
  • 原文地址:https://www.cnblogs.com/markLogZhu/p/12539602.html
Copyright © 2011-2022 走看看