zoukankan      html  css  js  c++  java
  • 50.RocketMQ (quickstart)

    要多给下属表功,绝不能抢功。

    1.订阅消息

    /**
     * Copyright (C) 2010-2013 Alibaba Group Holding Limited
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package com.alibaba.rocketmq.example.quickstart;
    
    import java.util.List;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    
    /**
     * Consumer,订阅消息
     */
    public class Consumer {
    
        public static void main(String[] args) throws InterruptedException, MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
            consumer.setNamesrvAddr("192.168.1.16:9876;192.168.1.17:9876");
            /**
             * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
             * 如果非第一次启动,那么按照上次消费的位置继续消费
             */
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
            consumer.subscribe("TopicTest", "*");
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                        ConsumeConcurrentlyContext context) {
                    System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            consumer.start();
    
            System.out.println("Consumer Started.");
        }
    }

    2.生产消息

    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    
    
    /**
     * Producer,发送消息
     * 
     */
    public class Producer {
        public static void main(String[] args) throws MQClientException, InterruptedException {
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            producer.setNamesrvAddr("192.168.1.16:9876;192.168.1.17:9876");
            producer.start();
    
            for (int i = 0; i < 1000; i++) {
                try {
                    Message msg = new Message("TopicTest", // topic
                        "TagA", // tag
                        ("Hello RocketMQ " + i).getBytes()// body
                    );
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);
                }
                catch (Exception e) {
                    e.printStackTrace();
                    Thread.sleep(1000);
                }
            }
    
            producer.shutdown();
        }
    }
  • 相关阅读:
    redis 键命令
    redis 数据类型
    java向word写入数据
    Eclipse发布到tomcat提示java.lang.ClassNotFoundException: org.springframework.web.context.ContextLoaderListener
    Unable to locate Spring NamespaceHandler for XMLschemanamespace http://dubbo.apache.org/schema/dubbo
    5.Dubbo之Spring XML配置
    6.Dubbo之XML配置详解。
    7.Dubbo之最佳实践
    RESTful API实践
    Jav程序执行Linux命令
  • 原文地址:https://www.cnblogs.com/sigm/p/6720021.html
Copyright © 2011-2022 走看看