zoukankan      html  css  js  c++  java
  • RocketMQ源码分析:(一)安装与案例演示

    环境: Windows

    暂时理解的功能图,不定时改动:

    1. 克隆rocketmq代码

    git clone git@github.com:apache/rocketmq.git

    2. 进入rocketmq目录,打成jar包

    cd rocketmq
    mvn -Prelease-all -DskipTests clean install -U

    3. 在rocketmqdistribution argetapache-rocketmqin目录下,依次启动mqnamesrv、mqbroker

    执行脚本mqnamesrv.cmd或者命令:start mqnamesrv.cmd
    执行脚本mqbroker.cmd或者命令:start mqbroker.cmd

    4. 创建一个基于maven的spring项目,工程目录如下

    5. pom.xml文件内容如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.demo.rocketmq</groupId>
        <artifactId>rocketmq-demo</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>1.5.9.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
    
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-common</artifactId>
                <version>4.4.0</version>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>4.4.0</version>
            </dependency>
    
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    </project>

    6. application.properties文件内容如下

    # 消费者的组名
    apache.rocketmq.consumer.PushConsumer=PushConsumer
    # 生产者的组名
    apache.rocketmq.producer.producerGroup=Producer
    # NameServer地址
    apache.rocketmq.namesrvAddr=localhost:9876

    7. producer端内容如下

    package com.miaoying.rocketmq.client;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StopWatch;
    
    import javax.annotation.PostConstruct;
    
    /**
     *  @Description:
     *  @author miaoying
     *  @date 2019/2/27
     */
    @Slf4j
    @Component
    public class RocketMQClient {
        /**
         * 生产者的组名
         */
        @Value("${apache.rocketmq.producer.producerGroup}")
        private String producerGroup;
    
        /**
         * NameServer 地址
         */
        @Value("${apache.rocketmq.namesrvAddr}")
        private String namesrvAddr;
    
        @PostConstruct
        public void defaultMQProducer() {
            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
            producer.setNamesrvAddr(namesrvAddr);
    
            int messageCount = 10;
    
            try {
                producer.start();
                StopWatch stop = new StopWatch();
                stop.start();
    
                for (int i = 0; i < messageCount; i++) {
               Message message = new Message("TopicMiao", "push", "keyTest", ("message" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult result = producer.send(message);
                    log.info("MsgId : " + result.getMsgId() + " , offsetMsgId : " + result.getOffsetMsgId() + " , send status : " + result.getSendStatus());
                }
                stop.stop();
                log.info("发送 " + messageCount + " 条消息耗时 : " + stop.getTotalTimeMillis());
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                producer.shutdown();
            }
        }
    }

    8. consumer端内容如下

    package com.miaoying.rocketmq.server;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import java.io.UnsupportedEncodingException;
    
    /**
     *  @Description:
     *  @author miaoying
     *  @date 2019/2/27
     */
    @Component
    @Slf4j
    public class RocketMQServer {
        /**
         * 消费者的组名
         */
        @Value("${apache.rocketmq.consumer.PushConsumer}")
        private String comsumerGroup;
    
        @Value("${apache.rocketmq.namesrvAddr}")
        private String namesrvAddr;
    
        @PostConstruct
        public void defaultMQPushConsumer() {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(comsumerGroup);
    
            consumer.setNamesrvAddr(namesrvAddr);
    
            try {
                // 表示订阅TopicMiao下tag为push的消息
                consumer.subscribe("TopicMiao", "push");
                // 设置consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
                // 如果非第一次启动,那么按照上次消费的位置继续消费
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
                    try {
                        for (MessageExt messageExt : list) {
                            // 输出消息内容
                            log.info("messageExt : " + messageExt);
                            String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                            log.info("msgId ; " + messageExt.getMsgId() + " , mgsBody : " + messageBody);
                        }
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                });
                consumer.start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    9. 运行启动类,发现报错如下

    org.apache.rocketmq.client.exception.MQClientException: No route info of this topic, TopicMiao

    提示目前使用的topic没有注册,nameserve 地址为127.0.0.1:9876,可以先查看哪些注册上去了,在rocketmqdistribution argetapache-rocketmqin目录下执行:

    start mqadmin.cmd topicList -n 127.0.0.1:9876 

    发现列表里面并没有我们使用的topic: TopicMiao,所以就创建一个吧:

    start mqadmin.cmd updateTopic -b 127.0.0.1:10911 -t TopicMiao -n 127.0.0.1:9876

    输出信息如下表示创建topic成功(默认分配了8个读队列,8个写队列,队列数目可修改):

    create topic to 127.0.0.1:10911 success.
    TopicConfig [topicName=TopicMiao, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]

    10. 创建完成后,再次执行启动类,即可看到如下日志输出,表示已经正常产生且消费消息:

  • 相关阅读:
    Bootstrap (Web前端CSS框架)
    面向对象和构造函数
    BFC(块级格式化上下文)
    图片轮播
    yii 计划任务
    Yii-数据模型- rules类验证器方法详解
    ubuntu下svn使用指南
    PHP加密解密函数
    在 PHP 中结合 Ajax 技术进行图片上传
    CSS3常用功能的写法
  • 原文地址:https://www.cnblogs.com/miaoying/p/9619755.html
Copyright © 2011-2022 走看看