zoukankan      html  css  js  c++  java
  • RocketMQ安装及入门

    本文是作者原创,版权归作者所有.若要转载,请注明出处.

    本文RocketMQ版本为rocketmq-all-4.7.0,系统为win10.请各位去官网下载,也可以留言,我发安装包

    RocketMQ安装

    (前置条件为已安装java并配置JAVA_HOME)

    1.配置环境变量

     

     2.切换到安装目录的bin文件夹下,启动mqnamesrv.cmd,如下图

     命令

    start mqnamesrv.cmd

     看到如下窗口表示启动成功

     3.启动bin目录下的mqbroker.cmd

    命令:

    start mqbroker.cmd -n 127.0.0.1:9876 -c ../conf/broker.conf

     看到如下窗口表示启动成功

    4.RocketMQ管理控制台部署(非必须,可跳过)

    下载地址

    https://github.com/apache/rocketmq-externals.git

    下载完成之后,进入‘rocketmq-externals ocketmq-consolesrcmain esources’文件夹,打开‘application.properties’进行配置,如下

     这里将启动端口改成8081,并填配置rocketmq的端口

    127.0.0.1.9876

     编译该控制台源码

    进入‘ ocketmq-externals ocketmq-console’文件夹,

    执行下列命令,编译生成jar包

    mvn clean package -Dmaven.test.skip=true

     编译成功,看下图

    进入‘target’文件夹,执行下列命令.启动jar包

    java -jar rocketmq-console-ng-1.0.1.jar

    浏览器中输入

    http://127.0.0.1:8081/

    成功后即可查看。

    RocketMQ入门(官网demo)

    1.消息消费者consumer

    public static void main(String[] args) throws InterruptedException, MQClientException {
    
            // Instantiate with specified consumer group name.
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
    
            // Specify name server addresses.
            consumer.setNamesrvAddr("localhost:9876");
    
            // Subscribe one more more topics to consume.
            consumer.subscribe("TopicTest", "*");
            // Register callback to execute on arrival of messages fetched from brokers.
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                ConsumeConcurrentlyContext context) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            //Launch the consumer instance.
            consumer.start();
    
            System.out.printf("Consumer Started.%n");
        }

    2.消息生产者/制造者

    public static void main(String[] args) throws Exception {
            //Instantiate with a producer group name.
            DefaultMQProducer producer = new
                    DefaultMQProducer("please_rename_unique_group_name");
            // Specify name server addresses.
            producer.setNamesrvAddr("localhost:9876");
            //Launch the instance.
            producer.start();
            for (int i = 0; i < 10; i++) {
                //Create a message instance, specifying topic, tag and message body.
                Message msg = new Message("TopicTest" /* Topic */,
                        "TagA" /* Tag */,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                //Call send message to deliver message to one of brokers.
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            }
            //Shut down once the producer instance is not longer in use.
            producer.shutdown();
        }

     看下消费者的结果

  • 相关阅读:
    jQuery ajax中支持的数据类型
    行内元素与块级元素
    本地连接无法加载远程访问连接管理器服务,错误711
    SQL Server 两种判断表名是否存在且删除的方式
    SQL Server 2008 修改表名
    MySql5.1在Win7下的安装与重装问题的解决
    JavaScript关闭浏览器
    SQL Server 添加一条数据获取自动增长列的几种方法
    获取当前程序运行目录
    字符串的判断与替换
  • 原文地址:https://www.cnblogs.com/lusaisai/p/12994249.html
Copyright © 2011-2022 走看看