zoukankan      html  css  js  c++  java
  • rocketMQ入门

    一:什么是MQ

    MQ 就是 消息中间件。

    二:为什么使用MQ

    场景:电商双十一 零点的秒杀。在那一瞬间,来自用户的请求将会激增,如果不做任何措施,那服务很可能会被压垮。但是我们又不能直接把这些请求丢弃,而为了这个很小的时间段去扩容机器又显得大题小做。于是我们自然而言的想到,能不能把这些请求先放到一个消息队列里面,然后系统从消息队列里面拿出来请求做逻辑的处理和响应。通过拉长时间维度来保证服务的稳定性。这就是MQ。

    使用MQ只要解决的就是 在生产者消费者模式中,生产者生产的数据可能会突然激增,消费者来不及消费的问题。

    三:rocketMQ

    rocketMQ是一个MQ的实现。我们在开发中一直在强调,不要重复造轮子。既然我们需要一个MQ,那就找个别人实现过的MQ来用就行了。rocketMQ就是其中的一种。当然,还有其他的MQ组件,比如的 ActiveMQ、RabbitMQ,Kafka。

    四:rocketMQ下载

    http://rocketmq.apache.org/release_notes

    下载bin的包,比如 rocketmq-all-4.3.2-bin-release.zip 

    五:安装

    将下载的文件解压到对应目录。比如我解压到 C: ocketmq-all-4.3.2

    六:启动NAMESERVER

    去 C: ocketmq-all-4.3.2in目录下找到 mqnamesrv.cmd,双击运营即可。

    七:启动BROKER

    start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

    ( 假如弹出提示框提示‘错误: 找不到或无法加载主类 xxxxxx’。打开runbroker.cmd,然后将‘%CLASSPATH%’加上英文双引号。保存并重新执行start语句。)

    至此为止,rocketMQ就安装启动完成了。下面我们写的demo来使用rocket做一个helloWord

    八:写一个生产者,发消息

    public class Producer {
        public static void main(String[] args) throws MQClientException {
            DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
            producer.setNamesrvAddr("127.0.0.1:9876");
            producer.setInstanceName("producer");
            producer.start();
            try {
                for (int i = 0; i < 10; i++) {
                    Thread.sleep(2000);  //每2秒发送一次消息
                    Message msg = new Message("TopicA-test",// topic
                            "TagA",// tag
                            (new Date() + "Hello RocketMQ ,QuickStart" + i)
                                    .getBytes()// body
                    );
                    SendResult sendResult = producer.send(msg);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            
            producer.shutdown();
        }
    }
    

      

    九:写一个消费者,用来监听消息

    public class Consumer {
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
                    "rmq-group");
    
            consumer.setNamesrvAddr("127.0.0.1:9876");//设置rocketMQ服务的部署地址
            consumer.setInstanceName("consumer");
            /**
             * 被订阅消息的topic 和 subExpression。
             * 注意:一定要与消息发布者的topic 和 subExpression 一致
             */
            consumer.subscribe("TopicA-test", "TagA");
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {//监听器实现
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(
                        List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for (MessageExt msg : msgs) {
                        System.out.println(new String(msg.getBody()));//每次拿到消息我就打印出来
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.println("Consumer Started.");
        }
    }
    

     

    一个简单的demo就OK了

  • 相关阅读:
    理解Golang包导入
    go语言执行windows下命令行的方法
    Go中使用动态库C/C++库
    MongoDB · 引擎特性 · MongoDB索引原理
    Linux中more和less命令用法
    修改Linux文件句柄限制
    MongoDB自动删除过期数据--TTL索引
    mongodb可以通过profile来监控数据 (mongodb性能优化)
    MongoDB学习笔记(索引)
    查看nginx cache命中率
  • 原文地址:https://www.cnblogs.com/CUI-S/p/11604276.html
Copyright © 2011-2022 走看看