zoukankan      html  css  js  c++  java
  • RocketMQ

    http://rocketmq.apache.org/docs/quick-start/

    解压后添加启动脚本

    nohup sh bin/mqnamesrv >> logs/namesrv.log 2>&1 &
    nohup sh bin/mqbroker -n localhost:9876 >> logs/broker.log 2>&1 &

    启动broker的时候可能会报错

     

    修改bin/runbroker.sh里面的JVM配置

    JAVA_OPT="${JAVA_OPT} -server -Xms2g -Xmx2g -Xmn1g"

     关闭

    sh bin/mqshutdown broker
    sh bin/mqshutdown namesrv

     关于拒绝连接异常

    Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.0.1:10911> failed

     解决方案:https://my.oschina.net/simpleton/blog/3022576

    监控界面:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

    参考文章:https://www.cnblogs.com/cac2020/p/9447938.html

    按文中部署后运行程序还报错:

    直接在/etc/hosts中添加

    127.0.0.1 dev-server02

    再次运行即可

    集群部署

    使用2台机器,用2m-noslave 模式

    2台机器IP为:

    172.20.102.194 mq2

    172.20.102.150 mq1

    在两台机器的/etc/hosts中添加上面的配置

    在mq1机器上配置 

     conf/2m-noslave/broker-a.properties

    brokerClusterName=DefaultCluster
    brokerName=broker-a
    namesrvAddr=127.0.0.1:9876;mq2:9876

    在mq2上配置

     conf/2m-noslave/broker-b.properties

    brokerClusterName=DefaultCluster
    brokerName=broker-b
    namesrvAddr=127.0.0.1:9876;mq1:9876

    分别启动mq1和mq2的namesrv

    nohup sh bin/mqnamesrv >> logs/app.log 2>&1 &

    启动mq1的broker

    nohup sh bin/mqbroker -c conf/2m-noslave/broker-a.properties  >> logs/broker.log 2>&1 &

    启动mq2的broker

    nohup sh bin/mqbroker -c conf/2m-noslave/broker-b.properties  >> logs/broker.log 2>&1 &

    在监控界面中查看

    spring boot 中使用

    https://www.jianshu.com/p/8c4c2c2ab62e

    spring-boot-starter-rocketmq

    使用的时候如果遇到rocketMQTemplate bean报错

    需要配置producer

    rocketmq:
      name-server: 172.20.102.150:9876
      producer:
        group: ${spring.application.name}

    将rocketmq.name-server 修改为 rocketmq.nameServer

     RocketMq消息监听程序中消除大量的if..else

    这里要注意处理tag为空的情况

    //根据Topic分组
                Map<String, List<MessageExt>> topicGroups = msgs.stream().collect(Collectors.groupingBy(MessageExt::getTopic));
                for (Map.Entry<String, List<MessageExt>> topicEntry : topicGroups.entrySet()) {
                    String topic = topicEntry.getKey();
                    //根据tags分组
                    List<MessageExt> messageExts=topicEntry.getValue();
                    for(MessageExt messageExt:messageExts){
                        String tag= messageExt.getTags();
                        if(StringUtils.isEmpty(tag)){
                            messageExt.setTags("*");
                        }
                    }
                    Map<String, List<MessageExt>> tagGroups = topicEntry.getValue().stream().collect(Collectors.groupingBy(MessageExt::getTags));
                    for (Map.Entry<String, List<MessageExt>> tagEntry : tagGroups.entrySet()) {
                        String tag = tagEntry.getKey();
                        //消费某个主题下,tag的消息
                        this.consumeMsgForTag(topic,tag,tagEntry.getValue());
                    }
                }
    View Code

    非spring boot 中使用

    添加beans

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"   
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   
     xmlns:aop="http://www.springframework.org/schema/aop"   
     xmlns:context="http://www.springframework.org/schema/context"   
     xmlns:p="http://www.springframework.org/schema/p"   
     xmlns:tx="http://www.springframework.org/schema/tx"     
     xmlns:cache="http://www.springframework.org/schema/cache"
     xsi:schemaLocation="http://www.springframework.org/schema/beans    
      http://www.springframework.org/schema/beans/spring-beans.xsd   
      http://www.springframework.org/schema/aop    
      http://www.springframework.org/schema/aop/spring-aop-3.0.xsd   
      http://www.springframework.org/schema/context    
      http://www.springframework.org/schema/context/spring-context-3.0.xsd   
      http://www.springframework.org/schema/tx    
      http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
      http://www.springframework.org/schema/cache    
      http://www.springframework.org/schema/cache/spring-cache.xsd">   
    
        <cache:annotation-driven/>
    
        <bean id="messageListeners" class="com.rongdu.cashloan.cl.mq.MQConsumeMsgListenerProcessor"></bean>
    
        <bean id="rocketmqConsumer" class="org.apache.rocketmq.client.consumer.DefaultMQPushConsumer" init-method="start" destroy-method="shutdown">
            <property name="consumerGroup" value="${rocketmq.consumer.groupName}"/>
            <property name="namesrvAddr" value="${rocketmq.consumer.namesrvAddr}"/>
            <property name="messageListener" ref="messageListeners"/>
            <property name="subscription">
                <map>
                    <entry key="${rocketmq.consumer.topics}" value="${rocketmq.consumer.tags}" />
                </map>
            </property>
            <property name="consumeMessageBatchMaxSize" value="${rocketmq.consumer.consumeMessageBatchMaxSize}"></property>
        </bean>
    
        <bean id="rocketMQProducer" class="org.apache.rocketmq.client.producer.DefaultMQProducer" init-method="start" destroy-method="shutdown">
            <property name="producerGroup" value="${rocketmq.producer.groupName}"/>
            <property name="namesrvAddr" value="${rocketmq.producer.namesrvAddr}"/>
            <!-- 失败重试次数 -->
            <property name="retryTimesWhenSendFailed" value="${rocketmq.producer.retryTimesWhenSendFailed}" />
        </bean>
              
    </beans> 
    mq-beans

    添加配置

    #该应用是否启用生产者
    rocketmq.producer.isOnOff=on
    #发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
    rocketmq.producer.groupName=cashloan-producer
    #mq的nameserver地址
    rocketmq.producer.namesrvAddr=172.20.102.150:9876
    rocketmq.producer.topics=cashloan_risk_control_req
    #消息最大长度 默认1024*4(4M)
    rocketmq.producer.maxMessageSize=4096
    #发送消息超时时间,默认3000
    rocketmq.producer.sendMsgTimeout=3000
    #发送消息失败重试次数,默认2
    rocketmq.producer.retryTimesWhenSendFailed=2
    
    
    ###consumer
    ##该应用是否启用消费者
    rocketmq.consumer.isOnOff=on
    rocketmq.consumer.groupName=cashloan-consumer
    #mq的nameserver地址
    rocketmq.consumer.namesrvAddr=172.20.102.150:9876
    #该消费者订阅的主题和tags("*"号表示订阅该主题下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*;
    rocketmq.consumer.topics=cashloan_risk_control_result
    rocketmq.consumer.tags=*
    
    rocketmq.consumer.consumeThreadMin=20
    rocketmq.consumer.consumeThreadMax=64
    #设置一次消费消息的条数,默认为1条
    rocketmq.consumer.consumeMessageBatchMaxSize=1
    application-properties

    生产者

    Message msg = new Message("cashloan_risk_control_req" /* Topic */,
                        "TagA" /* Tag */,
                        (String.valueOf(borrowId)).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                SendResult sendResult = producer.send(msg);
    View Code

    消息监听

    @Component
    public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {
        private static final Logger logger = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);
       
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            if(CollectionUtils.isEmpty(msgs)){
                logger.info("接受到的消息为空,不处理,直接返回成功");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            MessageExt messageExt = msgs.get(0);
            logger.info("接受到的消息为:"+messageExt.toString());
            if(messageExt.getTopic().equals("cashloan_risk_control")){
                if(messageExt.getTags().equals("borrowId")){
                    //TODO 判断该消息是否重复消费(RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重)
                    //TODO 获取该消息重试次数
                    int reconsume = messageExt.getReconsumeTimes();
                    if(reconsume ==3){//消息已经重试了3次,如果不需要再次消费,则返回成功
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                    //TODO 处理对应的业务逻辑
                    
                }
            }
            // 如果没有return success ,consumer会重新消费该消息,直到return success
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }
    View Code
  • 相关阅读:
    20172327 2017-2018-2 《程序设计与数据结构》第十一周学习总结
    20172327 2017-2018-2 《程序设计与数据结构》实验3报告
    20172327 2017-2018-2 《程序设计与数据结构》第十周学习总结
    20172327 2017-2018-2 《程序设计与数据结构》第九周学习总结
    20172327 结对编程项目-四则运算 第二周 阶段总结
    20172327 2017-2018-2 《程序设计与数据结构》第八周学习总结
    20172327 结对编程项目-四则运算 第一周 阶段总结
    20172327 2017-2018-2 《程序设计与数据结构》实验2报告
    20172327 2017-2018-2 《程序设计与数据结构》第七周学习总结
    MySQL数据库(四)—— 记录相关操作之插入、更新、删除、查询(单表、多表)
  • 原文地址:https://www.cnblogs.com/uptothesky/p/10065485.html
Copyright © 2011-2022 走看看