zoukankan      html  css  js  c++  java
  • rocketmq 遇到的一次问题记录

          今天上线时候碰到这么一个问题:由于上游生产者地址修改了,项目里面原来的消费者地址也要跟着修改。由于项目里面有两个消费者,之前的地址都是一样的。但是是两个不同的消费者,业务也是不同的。但是今天修改了第一个消费者的地址后,通过rocketmq的界面 发现 第二个的实例竟然也跑到了第一个上面。 

    问题如下:

       

    然后修改后如下:

     

    消费者的配置文件:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:aop="http://www.springframework.org/schema/aop"
           xmlns:cache="http://www.springframework.org/schema/cache"
           xmlns:util="http://www.springframework.org/schema/util"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
            http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd
    ">
        <!-- 订单状态变化时,统计相关的数据BEGIN -->
        <!--<bean id="statisticsOrderConsumer"  class="com.zhuanche.common.rocketmq.CommonRocketConsumer" init-method="init" destroy-method="destroy">
            <property name="groupName" value="mp-manage"/>&lt;!&ndash; 消费组名称,注意:相同的组名在同一个JVM中只能配置为一个 &ndash;&gt;
            <property name="namesrvAddr" value="${rocketmq.namesrvAddr}"/>&lt;!&ndash; RocketMQ命名服务地址 &ndash;&gt;
            <property name="consumeFromWhere" value="CONSUME_FROM_LAST_OFFSET"/>&lt;!&ndash; 消费起点 &ndash;&gt;
            <property name="messageModel" value="CLUSTERING"/>&lt;!&ndash; 消费模式 &ndash;&gt;
            <property name="topic" value="car_XXXXXXXXXXXX_canal_new"/>&lt;!&ndash; RocketMQ TOPIC &ndash;&gt;
            <property name="tags" value="1_40 || 1_43 || 1_44 || 1_45 || 1_50 || 2_40 || 2_43 || 2_44 || 2_45 || 2_50"/>&lt;!&ndash; RocketMQ TAG &ndash;&gt;
            <property name="messageBatchMaxSize" value="10"/>&lt;!&ndash; 每次消费消息的最大数量 &ndash;&gt;
            <property name="threads" value="2"/>&lt;!&ndash; 并发消费线程数 &ndash;&gt;
            <property name="messageListener"  ref="consumeOrderChangeForStatisticsListener"/>&lt;!&ndash; 业务消息处理实现类 &ndash;&gt;
        </bean>-->
        <!-- 订单状态变化时,统计相关的数据END -->
    
        <!--城际拼车监听-->
        <bean id="newInterCity"  class="com.zhuanche.common.rocketmq.CommonRocketConsumer" init-method="init" destroy-method="destroy" scope="singleton">
            <property name="groupName" value="mp-newInterCity"/><!-- 消费组名称,注意:相同的组名在同一个JVM中只能配置为一个 -->
            <property name="namesrvAddr" value="${rocketmqcity.namesrvAddr}"/><!-- RocketMQ命名服务地址 -->
            <property name="consumeFromWhere" value="CONSUME_FROM_LAST_OFFSET"/><!-- 消费起点 -->
            <property name="messageModel" value="CLUSTERING"/><!-- 消费模式 -->
            <property name="topic" value="car_fact_order_canal_msg"/><!-- RocketMQ TOPIC -->
            <property name="tags" value="1_13"/><!--监听订单组的tag 注意tag格式-->
            <property name="messageBatchMaxSize" value="10"/><!-- 每次消费消息的最大数量 -->
            <property name="threads" value="2"/><!-- 并发消费线程数 -->
            <property name="messageListener" >
                <bean class="com.zhuanche.common.listen.NewInterCityListener"/>
            </property>
        </bean>
    
    
        <!--城际拼车监听主订单-->
        <bean id="mainInterCityConsumer"  class="com.zhuanche.common.rocketmq.MainInterCityConsumer" init-method="init" destroy-method="destroy" scope="singleton">
            <property name="groupName" value="mainListenerConsumer"/><!-- 消费组名称,注意:相同的组名在同一个JVM中只能配置为一个 -->
            <property name="namesrvAddr" value="${rocketmqMain.namesrvAddr}"/><!-- RocketMQ命名服务地址 -->
            <property name="consumeFromWhere" value="CONSUME_FROM_LAST_OFFSET"/><!-- 消费起点 -->
            <property name="messageModel" value="CLUSTERING"/><!-- 消费模式 -->
            <property name="topic" value="carpool_mainorder"/><!-- RocketMQ TOPIC -->
            <property name="tags" value="15 || 60 || 30"/><!--监听订单组的tag 注意tag格式-->
            <property name="messageBatchMaxSize" value="10"/><!-- 每次消费消息的最大数量 -->
            <property name="threads" value="2"/><!-- 并发消费线程数 -->
            <property name="messageListener" >
                <bean class="com.zhuanche.common.listen.MainInterCityListener"/>
            </property>
        </bean>
    </beans>

    这个时候通过rocketmq的界面发现:

     发现都跑到了修改后的一个上面了。

    然后看源码,发现有个问题:默认 的instance都是default

    最后上网查询,才发现这个是需要在代码里面设置的。要不都会跑到同一个实例上面。

     解决办法就是设置不同的实例名字就行了。

     
    consumer.setInstanceName(System.currentTimeMillis()+"**");
  • 相关阅读:
    os和sys模块
    time模块
    collections模块
    re模块
    Python初识一
    Python闭包函数
    压栈
    isinstance()和issubclass()
    匿名函数--lambda函数
    机器学习入门文章
  • 原文地址:https://www.cnblogs.com/thinkingandworkinghard/p/13215553.html
Copyright © 2011-2022 走看看