zoukankan      html  css  js  c++  java
  • RocketMq入门

    rocketMQ之生产者

    引入依赖

        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>4.5.1</version>
            </dependency>

    发布同步,异步,单向消息

    发送同步消息

    • 这个消息适用于对消息丢失忍受力比较低的,对性能要求没那么高的
    public class SynProducer {
    
        public static void main(String[] args) throws Exception {
            //初始化生产者
            DefaultMQProducer producer = new DefaultMQProducer("producer_group");
            //指定nameServer地址
            producer.setNamesrvAddr("192.168.220.135.9876;192.168.220.136:9876");
            //启动
            producer.start();
            for (int i = 0; i < 10; i++) {
                //创建消息,指定topic,tag和消息体(也可以只指定topic和消息体即可)
                Message msg = new Message("it_topic", "tag", ("hello" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                //发送并有result返回,可根据result判断发送是否成功
                SendResult result = producer.send(msg);
                System.out.println(result);
                TimeUnit.MILLISECONDS.sleep(500);
            }
            //关闭
            producer.shutdown();
    
        }
    }

    发送异步消息(性能高,但是可能造成消息丢失)

    public class AsyncProducer {
       public static void main(String[] args) throws Exception {
           //初始化生产者
           DefaultMQProducer producer = new DefaultMQProducer("producer_group");
           //指定nameServer地址
           producer.setNamesrvAddr("localhost:9876");
           //启动
           producer.start();
           //
           producer.setRetryTimesWhenSendAsyncFailed(0);
           for (int i = 0; i < 100; i++) {
               //创建消息,指定topic,tag和消息体
               Message msg = new Message("topicList", "tag", ("rocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
               //发送但是没有返回值,需要有一个回调函数,可以在里面的两个方法,做自己业务的处理
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println("send success");
                    }
    
                    @Override
                    public void onException(Throwable throwable) {
                        System.out.println("send error in"+throwable.getCause());
                    }
                });
           }
           //关闭
           producer.shutdown();
    
       }
    }

    单向发送消息

    public class OnewayProducer {
       public static void main(String[] args) throws Exception {
           //初始化生产者
           DefaultMQProducer producer = new DefaultMQProducer("producer_group");
           //指定nameServer地址
           producer.setNamesrvAddr("localhost:9876");
           //启动
           producer.start();
           for (int i = 0; i < 100; i++) {
               //创建消息,指定topic,tag和消息体
               Message msg = new Message("topicList", "tag", ("rocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
               //发送单向消息,并没有返回值,无论成功与否,都只发送这次
               producer.sendOneway(msg);
           }
           //关闭
           producer.shutdown();
    
       }
    }

     

    rocketMQ之消费者

    引入依赖

        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>4.5.1</version>
            </dependency>

    消费消息的两种模式

    负载均衡模式(默认)

    • 多个消费者采用负载均衡消费,每个消费者处理的消息不同
    public class ClusteringConsumer {
       public static void main(String[] args) throws Exception {
           //实例化消费者,指定组名
           DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg3");
           //指定NameServer地址
           consumer.setNamesrvAddr("192.168.220.135.9876;192.168.220.136:9876");
           //订阅topic,第二个参数指定的tag
           consumer.subscribe("it_topic","*");
           //指定负载均衡模式
           consumer.setMessageModel(MessageModel.CLUSTERING);
           //注册回调函数,处理消息
           consumer.registerMessageListener(new MessageListenerConcurrently() {
               @Override
               public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                   list.forEach(v-> System.out.println(v+"  $消息内容:"+new String(v.getBody())));
                   return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
               }
           });
           //启动消费者
           consumer.start();
           System.out.println("Consumer start ");
       }
    }

    广播模式

    • 多个消费者消费同一条消息
    public class ClusteringConsumer {
    public static void main(String[] args) throws Exception {
    //实例化消费者,指定组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg1");
    //指定NameServer地址
    consumer.setNamesrvAddr("192.168.220.135.9876;192.168.220.136:9876");
    //订阅topic,第二个参数指定的tag
    consumer.subscribe("it_topic","*");
    //指定负载均衡模式
    consumer.setMessageModel(MessageModel.BROADCASTING);
    //注册回调函数,处理消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    list.forEach(v-> System.out.println(v+" $消息内容:"+new String(v.getBody())));
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    });
    //启动消费者
    consumer.start();
    System.out.println("Consumer start ");
    }
    }

    在发送者客户端在同步模式下,创建topic为it_topic,然后发送消息。

    在消费者客户端在负载均衡模式下,分别创建三组消费者,cg1,cg2,cg3,然后对it_topic的消息进行拉取消费

    在消费者消费的过程中,将cg2客户端关闭,不再消费,然后我们通过rocketmq-console的TOPIC选项的CONSUMER MANAGER来观察3组消费者的消费情况。

     打开CONSUMER MANAGER,展示结果如下:cg1和cg3都消费完成,cg2还剩下一些没有消费,从brokerOffset和consumerOffset的位置数值可以看出来。

     备注:非广播模式下,一个消息队列只能被同一消费者组中的一个消费者消费,一个消费者可以消费多个队列。而且不同的消费者组可以消费相同的消息,各个消费者组消费后记录各自的消息偏移量,互不影响。广播模式消息肯定能被各个消费者组消费,而且还是被消费者组内各个消费者都消费一遍。

     同一queueId(发送消息时指定队列id)的消息只能发送给同一broker的相同的queueId的队列,同一broker的同一queueId队列的消息只能分发给同一个消费端,这也是实现顺序消息队列的基础。

    参考文章:RocketMq系列

  • 相关阅读:
    动画效果
    iOS蓝牙4.0
    讯飞语音接口使用
    Xcode添加注释
    CocoaPods安装
    mac os 下打开FTP服务器
    画面渲染:实时渲染(Real-time Rendering)、离线渲染(Offline Rendering)[转]
    Unity3D笔记 英保通九 创建数
    Unity3D笔记 英保通八 关节、 布料、粒子系统
    Unity3D 记第二次面试
  • 原文地址:https://www.cnblogs.com/tyhj-zxp/p/13234752.html
Copyright © 2011-2022 走看看