zoukankan      html  css  js  c++  java
  • RocketMQ 4.5.1 单机环境搭建以及生产消费测试

    为了学习和方便测试,总是要启动一个单机版的。下载 http://rocketmq.apache.org/dowloading/releases/

    1. 要先配置环境变量

    ROCKETMQ_HOME
    
    E:
    ocketmq-all-4.5.1-bin-release

    2. 然后进入bin目录启动NameServer

    3. 启动Broker

    启动

    E:
    ocketmq-all-4.5.1-bin-releasein>mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true

    可能会出现一个错误: 找不到或无法加载主类 FilesJavajdk1.8.0_161lib;C:Program

    解决方法:(打开bin目录的runserver.cmd

    修改成

    重新启动,成功

    4. 弄个管控台方便查看

    https://github.com/apache/rocketmq-externals

    下载好进入 rocketmq-console 目录打包

    mvn clean package -Dmaven.test.skip=true

    进入target目录,启动 (最后的参数的nameserver的地址,也就是我本机地址)

    E:
    ocketmq-externals-master
    ocketmq-console	arget>java -jar rocketmq-console-ng-1.0.1.jar --rocketmq.config.namesrvAddr=localhost:9876

    最后访问 http://localhost:8080 即可

    5. 简单测试

    引入依赖

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.5.1</version>
    </dependency>

    一个简单的生产者

    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    import java.io.UnsupportedEncodingException;
    
    public class Test {
    
        public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
            // 设置生产者组名
            DefaultMQProducer producer = new DefaultMQProducer("my_producer_group");
            // 设置NameServer地址
            producer.setNamesrvAddr("10.204.241.15:9876");
            // 启动
            producer.start();
            for (int i = 0; i < 10; i++) {
                // 创建一条消息,包含topic、tag以及消息内容
                Message msg = new Message("MyTopic", "MyTag", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 发送结果
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            }
            // 不用的时候关闭
            producer.shutdown();
        }
    
    }

    查看管控台

    查看详细

    下面一个简单的消费者

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    import java.io.UnsupportedEncodingException;
    import java.util.List;
    
    public class Test2 {
    
        public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
            // 设置生产者组名
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_producer_group");
            // 设置NameServer地址
            consumer.setNamesrvAddr("localhost:9876");
            // 订阅的主题
            consumer.subscribe("MyTopic", "*");
            // 注册消息监听
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            // 启动
            consumer.start();
            System.out.printf("Consumer Started.%n");
        }
    
    }

    控制台输出

    不要关闭消费者,查看管控台

  • 相关阅读:
    OpenGL Geometry Shader
    正向渲染路径细节 Forward Rendering Path Details
    Tessellation
    渲染路径-实时渲染中常用的几种Rendering Path
    ugui batches
    如何有效提升Unity Gear VR游戏性能
    Unity3D命令行Build
    手机游戏资源 特效 显存分析工具
    换装demo时美术遇到的问题详解
    Rigging a Versatile Weapon Bone for 3ds Max
  • 原文地址:https://www.cnblogs.com/LUA123/p/11023753.html
Copyright © 2011-2022 走看看