zoukankan      html  css  js  c++  java
  • RocketMQ的生产者和消费者

    生产者:

    /**
                     * 生产者
                     */
                    public class Provider {
                        public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException {
                            //创建一个生产者
                            DefaultMQProducer producer=new DefaultMQProducer("rmq-group");
                            //设置NameServer地址
                            producer.setNamesrvAddr("192.168.42.22:9876;192.168.42.33:9876");
                            //设置生产者实例名称
                            producer.setInstanceName("provider");
                            //启动生产者
                            producer.start();
                            //发送消息
                            for (int i = 1; i <=10 ; i++) {
                                Thread.sleep(1000); //模拟网络延迟
                                //创建消息  topic代表主题名称     tags代表小分类     body代表消息体
                                Message message=new Message("weksoft_topic","TagA",("wdksoft-"+i).getBytes());
                                //发送消息
                                SendResult send = producer.send(message);
                                System.out.println(send.toString());
                            }
                        }
                    }

    消费者

    /**
                     * 消费者:监听消费
                     */
                    public class Consumer {
                        public static void main(String[] args) throws MQClientException {
                            //创建消费者
                            DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group");
                            //设置NameServer地址
                            consumer.setNamesrvAddr("192.168.42.22:9876;192.168.42.33:9876");
                            //设置实例名称
                            consumer.setInstanceName("consumer");
                            //订阅Topic
                            consumer.subscribe("weksoft_topic","TagA");
                            //监听消息
                            consumer.registerMessageListener(new MessageListenerConcurrently() {
                                @Override
                                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                                    //获取消息
                                    for(MessageExt ext:msgs){
                                        //RocketMQ由于是集群环境,所以产生的消息ID可能会重复
                                        System.out.println(ext.getMsgId()+"----------"+new String(ext.getBody()));
                                    }
                                    //接受消息状态 1.消费成功    2.消费失败   队列还有
                                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                                }
                            });
                            //启动消费者
                            consumer.start();
                        }
                    }

    生产者生产消息

    消费者消费消息

     控制台多了入队和出队的记录

  • 相关阅读:
    二元查找树的后序遍历结果
    CFileDialog设置多选时的一个问题
    KanRSS.com
    由shuttle这个单词想起的一个小故事
    Sun Java moved to the Partner repository
    Sun Java moved to the Partner repository
    扩展std::string功能的几个做法
    NetBeans 时事通讯(刊号 # 99 Apr 16, 2010)
    NetBeans IDE 6.9 Beta 发布
    KanRSS.com
  • 原文地址:https://www.cnblogs.com/chx9832/p/12325729.html
Copyright © 2011-2022 走看看