zoukankan      html  css  js  c++  java
  • win10 RocketMQ的简单运用

    1、在官网下载RocketMQ,下载完解压即可,下面也主要是根据官网来进行操作的,同时也适当地借鉴了其他blog。

    2、添加环境变量:

    ROCKETMQ_HOME="D:
    ocketmq"
    NAMESRV_ADDR="localhost:9876"
    

    3、启动Name Server

    mqnamesrv.cmd
    

    4、启动Broker,如果启动失败,删除C:Users"当前系统用户名"store下的所有文件,注意要添加autoCreateTopicEnable=true 否则在创建消息组时会报错:MQClientException: No route info of this topic

    mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
    

    5、创建Maven工程,添加RocketMQ依赖,注意版本要和自己的服务器(下载的版本一致),否则可能会出现MQClientException: No route info of this topic这个错误

      <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>4.6.0</version>
            </dependency>
    

    6、provider:

    package provider;
    
    import com.sun.xml.internal.bind.api.impl.NameConverter;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    
    public class Pro {
        public static void main(String [] args){
            DefaultMQProducer defaultMQProducer = new DefaultMQProducer("Demo");
            defaultMQProducer.setNamesrvAddr("localhost:9876");
            try {
                defaultMQProducer.start();
                for(int i = 0;i<100;i++){
                    try {
                        Message message = new Message("Topic1","Tag1",
                                ("Hello World"+i).getBytes("UTF-8"));
                        SendResult sendResult = defaultMQProducer.send(message);
                        //defaultMQProducer.sendOneway(message);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                defaultMQProducer.shutdown();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
        }
    }
    

    7、consumer

    package consumer;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.*;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    public class Consumer {
        public  static  void main(String [] args){
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerDemo");
            consumer.setNamesrvAddr("localhost:9876");
            try {
                consumer.subscribe("Topic1","*");
                consumer.registerMessageListener(new MessageListenerConcurrently() {
                    public ConsumeConcurrentlyStatus
                    consumeMessage(List<MessageExt> list,
                                   ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                        for(MessageExt messageExt :list){
                            String str  =  new String(messageExt.getBody());
                            System.out.println(str);
                        }
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        return  ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                });
    //            consumer.registerMessageListener(new MessageListenerOrderly() {
    //                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
    //                    for(MessageExt messageExt :list){
    //                        String str  =  new String(messageExt.getBody());
    //                        System.out.println(str);
    //                    }
    //                    return  ConsumeOrderlyStatus.SUCCESS;
    //                }
    //            });
                consumer.start();
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    }
    

    7、输出结果,可以看到输出的结果并不是很一致。

    先看看RocketMQ架构图

    1,启动Namesrv,Namesrv起来后监听端口,等待Broker、Produer、Consumer连上来,相当于一个路由控制中心。
    2,Broker启动,跟所有的Namesrv保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有topic信息。注册成功后,namesrv集群中就有Topic跟Broker的映射关系。
    3,收发消息前,先创建topic,创建topic时需要指定该topic要存储在哪些Broker上。也可以在发送消息时自动创建Topic。
    4,Producer发送消息,启动时先跟Namesrv集群中的其中一台建立长连接,并从Namesrv中获取当前发送的Topic存在哪些Broker上,然后跟对应的Broker建立长连接,直接向Broker发消息。
    5,Consumer跟Producer类似。跟其中一台Namesrv建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。


     再看看消息框架图:

     可以看到在同一个Topic下的消息在被放在了不同的broker,并且在同一个broker下,存放在不同的队列当中,虽然本地运行时只有一个broker,但是还是有很多队列,所以消息获取者获取到消息顺序是不一定的。


    进行改进:

    provide:在这里将同一业务流程放在一同一队列

    package provider;
    
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.MessageQueueSelector;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageQueue;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import java.util.List;
    import java.util.Scanner;
    
    public class Pro {
        public static void main(String [] args){
            DefaultMQProducer defaultMQProducer = new DefaultMQProducer("Demo");
            defaultMQProducer.setNamesrvAddr("localhost:9876");
            String[] tags = new String[]{"创建订单", "支付", "发货", "收货", "五星好评"};
            try {
                defaultMQProducer.start();
                for(int i = 0;i<100;i++){
                    int ordId = i /5 +1;
                    try {
                        Message message = new Message("Topic1",tags[i%5],"ID"+i,
                                ("order"+ordId+":"+tags[i%5]).getBytes(RemotingHelper.DEFAULT_CHARSET));
                        //SendResult sendResult = defaultMQProducer.send(message);
                        //defaultMQProducer.sendOneway(message);
                        SendResult sendResult = defaultMQProducer.send(message,new MessageQueueSelector(){
                            public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                                System.out.println(list.size()+"  message:"+message.toString()+"    o:"+o);
                                //o即为ordId
                                Integer id = (Integer) o;
                                int index = id % list.size();
                                return list.get(index);
                            }
                        },ordId);
                        System.out.printf("%s%n",sendResult);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                defaultMQProducer.shutdown();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
        }
    }
    

    consumer:在这里消息接收换了个函数,在这里同一队列里面的消息是按顺序消费的

    package consumer;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.*;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    public class Consumer {
        public  static  void main(String [] args){
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerDemo");
            consumer.setNamesrvAddr("localhost:9876");
            try {
                consumer.subscribe("Topic1","*");
    //            consumer.registerMessageListener(new MessageListenerConcurrently() {
    //                public ConsumeConcurrentlyStatus
    //                consumeMessage(List<MessageExt> list,
    //                               ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    //                    for(MessageExt messageExt :list){
    //                        String str  =  new String(messageExt.getBody());
    //                        System.out.println(str);
    //                    }
    //                    try {
    //                        Thread.sleep(1000);
    //                    } catch (InterruptedException e) {
    //                        e.printStackTrace();
    //                    }
    //                    return  ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    //                }
    //            });
                consumer.registerMessageListener(new MessageListenerOrderly() {
                    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                        for(MessageExt messageExt :list){
                            String str  =  new String(messageExt.getBody());
                            System.out.println(str);
                        }
                        return  ConsumeOrderlyStatus.SUCCESS;
                    }
                });
                consumer.start();
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    }
    

     结果如下:可以看到同一订单的顺序没有打乱

     那如何把所有的消息都按顺序执行呢,放在一个队列就可以了,就是把上面的return语句改为 return list.get(0),但是这样做肯定会牺牲效率的,下面是演示结果


    producer发送消息的三种方式:

    可靠同步消息:同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。在代码中也可以看到发送完每一个message后都会有一个SendResult返回,这个当消息存在顺序问题时可以使用

    package provider;
    
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.MessageQueueSelector;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageQueue;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import java.util.List;
    import java.util.Scanner;
    
    public class Pro {
        public static void main(String [] args){
            DefaultMQProducer defaultMQProducer = new DefaultMQProducer("Demo");
            defaultMQProducer.setNamesrvAddr("localhost:9876");
            String[] tags = new String[]{"创建订单", "支付", "发货", "收货", "五星好评"};
            try {
                defaultMQProducer.start();
                for(int i = 0;i<100;i++){
                    int ordId = i /5 +1;
                    try {
                        Message message = new Message("Topic1",tags[i%5],"ID"+i,
                                ("order"+ordId+":"+tags[i%5]).getBytes(RemotingHelper.DEFAULT_CHARSET));
                        SendResult sendResult = defaultMQProducer.send(message);
    //                    //defaultMQProducer.sendOneway(message);
    //                    SendResult sendResult = defaultMQProducer.send(message,new MessageQueueSelector(){
    //                        public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
    //                            System.out.println(list.size()+"  message:"+message.toString()+"    o:"+o);
    //                            //o即为ordId
    //                            Integer id = (Integer) o;
    //                            int index = id % list.size();
    //                            return list.get(index);
    //                        }
    //                    },ordId);
                        System.out.printf("%s%n",sendResult);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
    
                Scanner scanner = new Scanner(System.in);
                defaultMQProducer.shutdown();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
        }
    }
    

     2、可靠异步发送,异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。由于是发了消息继续发第二个消息,可能消息响应没那么及时,因此这里用了countdownLatch,在5s内如果能接收到所有响应,结束程序,如果5s内不能收到,5s后结束程序,当然也可以使用countdownLatch.awite()得到所以的响应后停止程序。这个当消息顺序不重要时可以使用,如抢购,拼网速。

          int messageCount = 100;
                defaultMQProducer.setRetryTimesWhenSendAsyncFailed(0);
                final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
                for (int i = 0; i < messageCount; i++) {
                    try {
                        final int index = i;
                        Message msg = new Message("Jodie_topic_1023",
                                "TagA",
                                "OrderID188",
                                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                        defaultMQProducer.send(msg, new SendCallback() {
    
                            public void onSuccess(SendResult sendResult) {
                                countDownLatch.countDown();
                                System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                            }
    
                            public void onException(Throwable e) {
                                countDownLatch.countDown();
                                System.out.printf("%-10d Exception %s %n", index, e);
                                e.printStackTrace();
                            }
                        });
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                countDownLatch.await(5, TimeUnit.SECONDS);
                defaultMQProducer.shutdown();
    

      

     3、单向(Oneway)发送,单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。但是可能存在消息没有完全交付。可以看到producer.sendOneway(msg)并没有返回结果。

     producer.setNamesrvAddr("localhost:9876");
            //Launch the instance.
            producer.start();
            for (int i = 0; i < 100; i++) {
                //Create a message instance, specifying topic, tag and message body.
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " +
                        i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                //Call send message to deliver message to one of brokers.
                producer.sendOneway(msg);
            }
    

     


    消息广播:broadcasting ,官网描述很清晰:Broadcasting is sending a message to all subscribers of a topic. If you want all subscribers receive messages about a topic, broadcasting is a good choice.就是发送方发消息时,所有订阅该topic的消费者都会收到这个消息。

    这个主要是消息接收者做出改变:

    package consumer;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
    import java.util.List;
    
    public class ConsumerBroadcast {
        public static void main(String[] args) throws Exception {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
    
            //一个新的订阅组第一次启动从队列的最后位置开始消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    
            //set to broadcast mode
            consumer.setMessageModel(MessageModel.BROADCASTING);
            //订阅主题
            consumer.subscribe("Topic1", "TagA || TagC || TagD");
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                ConsumeConcurrentlyContext context) {
                    System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            consumer.start();
            System.out.printf("Broadcast Consumer Started.%n");
        }
    }
    

      


     定时发送与接收

         目前RocketMQ只支持固定精度级别的定时消息,服务器按照1-N定义了如下级别: “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;;若要发送定时消息,在应用层初始化Message消息对象之后,调用setDelayTimeLevel(int level)方法来设置延迟级别,按照序列取相应的延迟级别,例如level=2,则延迟为5s:

    通过运行 下面的代码中可以看到发送消息并没有延迟,因为我的程序只睡了5s,但是消息时全部发送了的,通过SendResult可以看到,但是当我们提前打开消息接收方时,然后在打开消息发送方,可以看到消息发送10s后消息接收方才收到。

    package provider;
    
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.MessageQueueSelector;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageQueue;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import java.util.List;
    import java.util.Scanner;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    public class Pro {
        public static void main(String [] args){
            DefaultMQProducer defaultMQProducer = new DefaultMQProducer("Demo");
            defaultMQProducer.setNamesrvAddr("localhost:9876");
            String[] tags = new String[]{"创建订单", "支付", "发货", "收货", "五星好评"};
            try {
                defaultMQProducer.start();
                for(int i = 0;i<100;i++){
                    int ordId = i /5 +1;
                    try {
                        Message message = new Message("Topic1",tags[i%5],"ID"+i,
                                ("order"+ordId+":"+tags[i%5]).getBytes(RemotingHelper.DEFAULT_CHARSET));
                        message.setDelayTimeLevel(3);
                        SendResult sendResult = defaultMQProducer.send(message);
    //                    //defaultMQProducer.sendOneway(message);
    //                    SendResult sendResult = defaultMQProducer.send(message,new MessageQueueSelector(){
    //                        public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
    //                            System.out.println(list.size()+"  message:"+message.toString()+"    o:"+o);
    //                            //o即为ordId
    //                            Integer id = (Integer) o;
    //                            int index = id % list.size();
    //                            return list.get(index);
    //                        }
    //                    },ordId);
                        System.out.printf("%s%n",sendResult);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                Thread.sleep(5000);
                defaultMQProducer.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

      


    batch :一次可以发送多个消息,但是整个一次发送消息的大小不能超过1M,List<Message> messages = new ArrayList<>();


    Filter过滤:SQL feature could do some calculation through the properties you put in when sending messages.可以在发送消息添加属性,然后通过SQL做些计算来过滤掉不相干的消息。

    provider:

    package provider;
    
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.MessageQueueSelector;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageQueue;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import java.util.List;
    import java.util.Scanner;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    public class Pro {
        public static void main(String [] args){
            DefaultMQProducer defaultMQProducer = new DefaultMQProducer("Demo");
            defaultMQProducer.setNamesrvAddr("localhost:9876");
            String[] tags = new String[]{"创建订单", "支付", "发货", "收货", "五星好评"};
            try {
                defaultMQProducer.start();
                for(int i = 0;i<100;i++){
                    int ordId = i /5 +1;
                    try {
                        Message message = new Message("Topic1",tags[i%5],"ID"+i,
                                ("order"+ordId+":"+tags[i%5]).getBytes(RemotingHelper.DEFAULT_CHARSET));
                       // message.setDelayTimeLevel(3);
                        message.putUserProperty("ordId",String.valueOf(ordId));
                        SendResult sendResult = defaultMQProducer.send(message);
    //                    //defaultMQProducer.sendOneway(message);
    //                    SendResult sendResult = defaultMQProducer.send(message,new MessageQueueSelector(){
    //                        public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
    //                            System.out.println(list.size()+"  message:"+message.toString()+"    o:"+o);
    //                            //o即为ordId
    //                            Integer id = (Integer) o;
    //                            int index = id % list.size();
    //                            return list.get(index);
    //                        }
    //                    },ordId);
                        System.out.printf("%s%n",sendResult);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
    
                Thread.sleep(5000);
                defaultMQProducer.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

     consumer:只接收订单单号为1的订单消息

    package consumer;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.MessageSelector;
    import org.apache.rocketmq.client.consumer.listener.*;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    public class Consumer {
        public  static  void main(String [] args){
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerDemo");
            consumer.setNamesrvAddr("localhost:9876");
            try {
                consumer.subscribe("Topic1",MessageSelector.bySql("ordId = 1"));
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
                consumer.registerMessageListener(new MessageListenerConcurrently() {
                    public ConsumeConcurrentlyStatus
                    consumeMessage(List<MessageExt> list,
                                   ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                        for(MessageExt messageExt :list){
                            String str  =  new String(messageExt.getBody());
                            System.out.println(str);
                        }
                        return  ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                });
    //            consumer.registerMessageListener(new MessageListenerOrderly() {
    //                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
    //                    for(MessageExt messageExt :list){
    //                        String str  =  new String(messageExt.getBody());
    //                        System.out.println(str);
    //                    }
    //                    return  ConsumeOrderlyStatus.SUCCESS;
    //                }
    //            });
                consumer.start();
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    }
    

      运行后出现如下面的错误:

    网上的解决办法是:/conf/broker.conf文件下面添加:enablePropertyFilter=true

    然后用 start mqbroker.cmd -n 127.0.0.1:9876 -c ../conf/broker.conf来启动,用这个方法也确实成功了

     但是每次启动broker时都要使用start mqbroker.cmd -n 127.0.0.1:9876 -c ../conf/broker.conf这个来启动,暂时还没想到其他方法,如有其他方法也请告诉我,我找到了也会更新blog

  • 相关阅读:
    洛谷 P1401 城市(二分+网络流)
    洛谷 P2057 善意的投票(网络流最小割)
    洛谷 P1402 酒店之王
    二分图最大匹配的一些证明
    P2764 最小路径覆盖问题(网络流24题之一)
    洛谷 P2055 [ZJOI2009]假期的宿舍
    P2891 [USACO07OPEN]吃饭Dining(最大流+拆点)
    洛谷P1345 [USACO5.4]奶牛的电信(最小割)
    网络流24题之星际转移问题(洛谷P2754)
    LeetCode Unique Binary Search Trees
  • 原文地址:https://www.cnblogs.com/minblog/p/13328874.html
Copyright © 2011-2022 走看看