zoukankan      html  css  js  c++  java
  • mq使用经验

    1、Producer使用指南--发送消息注意事项


     

    1、正常情况下一个业务系统尽可能用一个Topic,消息子类型用tags来标识,tags可以由业务系统自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用tags在broker做消息过滤。

      MQCPMessage msg = new MQCPMessage(); // 初始化消息对象

      message.setTags("TagA"); // 设置消息TAG

    2、每个消息在业务层面的唯一标识码,要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic,key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。     

     MQCPMessage msg = new MQCPMessage(); // 初始化消息对象

      message.setTags("TagA"); // 设置消息TAG

      String orderId = "20034568923546"; // 订单Id

      message.setKeys(orderId);

    3、消息发送成功或者失败,要打印消息日志,务必要打印sendresult和key字段。

    4、send消息方法,只要不抛异常,就代表发送成功。但是发送成功会有多个状态,在sendResult里定义

    5、对于消息不可丢失应用,务必要有消息重发机制。例如如果消息发送失败,存储到数据库,能有定时程序尝试重发,或者人工触发重发。

    2、Consumer使用指南--消费端去重

    RocketMQ无法避免消息重复,所以如果业务对消费重复非常敏感,务必要在业务层面去重,有以下几种去重方式:

      1. 将消息的唯一键,可以是msgID,也可以是消息内容中的唯一标识字段,例如订单Id等,消费之前判断是否在Db或Tair(全局KV存储)中存在,如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)msgId一定是全局唯一标识符,但是可能会存在同样的消息有两个不同msgId的情况(有多种原因),这种情况可能会使业务上重复消费,建议最好使用消息内容中的唯一标识字段去重。

      2. 使用业务层面的状态机去重

    5、如何判断发送消息是否成功?

          客户端Producer调用send消息方法,只要不抛异常,就代表发送成功。但是发送成功会有多个状态,在sendResult里定义。

    返回状态                

    状态释义                

    SEND_OK

    消息发送成功                

    FLUSH_DISK_TIMEOUT

    消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时MASTER服务器宕机,消息才会丢失                

    FLUSH_SLAVE_TIMEOUT

    消息发送成功,但是服务器同步到Slave时超时,消息已经进入服务器队列,只有此时SLAVE服务器宕机,消息才会丢失                

    SLAVE_NOT_AVAILABLE

    消息发送成功,但是此时SLAVE不可用,消息已经进入服务器队列,只有此时SLAVE服务器宕机,消息才会丢失                

          目前MQCP测试和生产环境集群都采用两主两从共4台Broker机器,针对大部分业务系统来讲,只要MQCP没有抛出异常,可以默认消息已成功发送。建议业务系统针对发送消息后所有非SEND_OK状态的消息,打印Warning日志,并在运营端设置对应的监控规则,及时发邮件提醒。

    6、如何判断消费消息是否成功?

         客户端Consumer在MQCPMessageListener中实现pushMessage(),遍历并处理消息后会返回给MQCP端消费的状态,状态只有消费成功或者消费失败两种状态。

    消费状态                

    状态释义                

    CONSUME_OK

    消费成功                

    CONSUME_FAIL

    消费失败                

    7、消费端如何实现定时消费?

         在某些业务场景下,消费端希望在业务低峰(例如半夜12点后)时开始从MQCP拉取消息,在业务高峰期前关闭掉消费功能,以此来降低系统负载。这种类似场景涉及到如何在不停业务服务的场景下,多次的开启和关闭MQCP消费服务。

          MQCP的消费者本身是可以多实例初始化的,每个实例的消费者服务开启和关闭也是独立的,所以可以很良好的支持定时消费的场景。

    如果业务系统有类似的需求,我们建议:

          1. 业务系统本身需要添加功能开关,支持配置化的方式来开启或关闭消费服务。其实现本身比较简单,就是调用MQCP消费者的start || shutdown方法。必要时需要对方法添加上层逻辑封装,来实现定制化的需求。

          2. 调用完consumer对象的shutdown方法后,不要立即初始化下一个consumer对象并启用服务,建议至少延迟几秒种,等相关的资源回收完毕。

          3. 完善业务端开启或关闭消息服务的日志,方便后续运维处理问题。

          4. 具体实现方式,可以参考下面TestTimedConsumeImpl类。

    import java.io.UnsupportedEncodingException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;
    
    import com.paic.mqcp.client.MQCPFactory;
    import com.paic.mqcp.client.common.MQCPException;
    import com.paic.mqcp.client.common.MQCPMessageListener;
    import com.paic.mqcp.client.consumer.MQCPConsumer;
    import com.paic.mqcp.client.dto.MQCPMessage;
    import com.paic.mqcp.client.dto.MQCPMessageFilter;
    import com.paic.mqcp.client.util.MQCPConstant;
    import com.paic.mqcp.client.util.MQCPConsumeStatus;
    import com.paic.mqcp.common.consumer.ConsumeFromWhere;
    
    /**
     * 模拟业务系统需要在不停应用的场景下,开启、关闭接受消息服务
     * 
     * @author WUJING754
     *
     */
    public class TestTimedConsumeImpl {
    	
    	/** 属性对象 */
    	private static Properties p = null;
    	
    	/**
    	 * 获取属性值对象
    	 * 
    	 * @return Properties
    	 */
    	public static Properties initialProperties() {
    		
    		p = new Properties();
    		// 业务系统可以从配置文件中取出该属性,demo中写死的
    		p.setProperty(MQCPConstant.NAME_SERVER_ADDRESS,
    				"10.20.22.148:9876;10.20.22.149:9876");
    		// 消费者ID,业务系统需要在MQCP-ADMIN中注册,否则无法正常发送消息
    		// demo中已经初始化好了的,后续请联系MQCP开发注册系统 
    		p.setProperty(MQCPConstant.CONSUMER_ID, "CID_PARP_TEST_DEFAULT");
    		// INSTANCE_NAME属性建议业务系统不设置,用默认的即可
    		//p.setProperty(MQCPConstant.INSTANCE_NAME,"_PACP");
    		
    		return p;
    	}
    	
    	public static MQCPConsumer initialConsumer(){
    		
    		MQCPConsumer pushConsumer = null;
    		
    		try {
    			// 初始化过滤对象
    			MQCPMessageFilter mqcpFilter = new MQCPMessageFilter();
    			List<String> list = new ArrayList<String>();
    			// 设置tag 
    			// tag的作用是过滤消息,如果设置改值,MQCP只会取出发送消息时设置了该tag值的消息
    			// 需要注意的是同一CID下的多个应用实例需要设置为同样的tag列表来过滤消息,以保证消息不会被过滤取走但未被业务系统处理
    			list.add("testByWUJING754");
    			mqcpFilter.setTags(list);
    			// 初始化MQCPPushConsumer并指定为集群消费(消息只会被消费一次,无论应用的实例有多少个)
    			// PUSH消费模式下,客户端包会启动后台线程不断的从MQCP中拉去消息(准实时方式,毫秒级延时)
    			// 业务系统收取到消息后,需要实现MQCPMessageListener,来处理消息
    			pushConsumer = MQCPFactory
    					.createConsumer(initialProperties());
    			// 设置第一次CID消费消息的时间点,这里设置从最后的消息开始获取
    			// 如果不设置该值,默认是队列的最开始出消费消息
    			pushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    			// 订阅 T_PAFA5_LOG_P 主题的消息(创建topic需要在MQCP-ADMIN中,如需创建请联系MQCP开发)
    			pushConsumer.subscribe("T_PARP_TEST_DEMO", mqcpFilter,
    					new MQCPMessageListener() {
    						@Override
    						public MQCPConsumeStatus pushMessage(
    								List<MQCPMessage> messageList) {
    							// 监听到有消息抵达后,业务系统需要遍历messageList对象,来获取消息
    							// messageList的默认大小为1,即消息是一条一条的推送到客户端的
    							for (MQCPMessage msg : messageList) {
    								// 消息明细  MQCPMessage对象
    								// 建议业务系统将从消息平台拉取消息和处理消息的逻辑解耦,
    								// 在consumer监听器中只监听到消息,建议简单的将获取的消息解析存储,然后返回 MQCPConsumeStatus.CONSUME_OK,
    								// 后端可以异步来处理接收到的消息 
    								try {
    									System.out.println("###########receive message
    [msgTopic:"
    													+ msg.getTopic()
    													+ "
    msgId:"
    													+ msg.getMsgId()
    													+ "
    msgContent:"
    													+ new String(msg.getConent(),"UTF-8")
    													+ "
    msgKey:"
    													+ msg.getKey() + "]");
    								} catch (UnsupportedEncodingException e) {
    									e.printStackTrace();
    								}
    							}
    
    							return MQCPConsumeStatus.CONSUME_OK;
    						}
    					});
    
    		} catch (Exception ex) {
    			ex.printStackTrace();
    		}
    		return pushConsumer;
    
    	}
    	
    	/**
    	 * 入口
    	 * @param args
    	 */
    	public static void main(String[] args) {
    		
    		// 初始化一个pushConsumer对象
    		MQCPConsumer pushConsumer = initialConsumer();
    		
    		// 启动该consumer对象,开始消费消息
    		startConsumingMsg(pushConsumer);
    		
    		// 业务系统不断收取消息,处理消息过程中...
    		
    		// 根据业务需要,可以调用stopConsumingMsg()方法来停止收取消息
    		stopConsumingMsg(pushConsumer);
    		
    		// 主线程休眠30秒
    		try {
    			Thread.sleep(30000L);
    		} catch (InterruptedException e) {
    			
    			e.printStackTrace();
    		}
    		// 模拟业务系统再次初始化一个consumer对象
    		pushConsumer = initialConsumer();
    		// 启动consumer,开始收取消息
    		startConsumingMsg(pushConsumer);
    		
    		// 业务系统不断收取消息,处理消息过程中...
    		
    		// 业务系统根据需要,停止取消息服务		
    		stopConsumingMsg(pushConsumer);
    		
    	}
    	/*
    	 * 启动传入的消费者对象
    	 */
    	public static boolean startConsumingMsg(final MQCPConsumer pushConsumer){
    		
    		boolean flag = true;
    		try {
    			pushConsumer.start();
    			System.out.println("-->start consumer success.");
    		} catch (MQCPException e) {
    			
    			System.out.println("-->start consumer fail due to " + e.getMessage());
    			e.printStackTrace();
    			flag = false;
    		}
    		return flag;
    	}
    	
    	/*
    	 * 关闭传入的消费者对象
    	 */
    	public static void stopConsumingMsg(final MQCPConsumer pushConsumer){
    		
    		pushConsumer.shutdown();
    		System.out.println("-->shutdown consumer success.");
    	}
    
    }

    9、客户端生产者是否有消息重发机制?

     (摘至RocketMq官方文档)
    RocketMq消息发送失败如何处理,Producer的send方法本身支持内部重试,重试逻辑如下:
      1. 至多重试5次。
      2. 如果发送失败,则轮转到下一个Broker。
      3. 这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认5s。
    所以,如果本身向broker发送消息产生超时异常,就不会再做重试。

      以上策略仍然不能保证消息一定发送成功,为保证消息一定成功,建议应用这样做:如果调用send同步方法发送失败,则尝试将消息存储到db,由后台线程定时重试,保证消息一定到达Broker。
         

      上述DB重试方式为什么没有集成到MQ客户端内部做,而是要求应用自己去完成,我们基于以下几点考虑 
        1. MQ的客户端设计为无状态模式,方便任意的水平扩展,且对机器资源的消耗仅仅是cpu、内存、网络。
        2. 如果MQ客户端内部集成一个KV存储模块,那么数据只有同步落盘才能较可靠,而同步落盘本身性能开销较大,所以通常会采用           异步落盘,又由于应用关闭过程不受MQ运维人员控制,可能经常会发生kill -9这样暴力方式关闭,造成数据没有及时落盘而丢           失。
        3. Producer所在机器的可靠性较低,一般为虚拟机,不适合存储重要数据。
          综上,建议重试过程交由应用来控制。

    10、客户端消费者是否有消息重发机制?

     (摘至RocketMq官方文档)
    消息重试机制如下:

           

      注意:重试的消息,MsgKey不变,MsgId会变。

    11、业务消费端没有取到消息,如何去定位问题?

         正常情况下生产者发送消息到MQCP,消息被投递到消费端的延时应该在毫秒级。如果消费端迟迟没有收到消息,建议采用下面的步骤来排查问题:

    1、 获得消息的消息ID或者KEY,去MQCP-ADMIN的消息查询模块,根据自己的消费者ID找到其对应的消费状态。

    2、 常见的投递状态有:

    SUBSCRIBED_AND_CONSUMED

    订阅了,而且消费了(Offset越过了)                

    SUBSCRIBED_BUT_FILTERD   

    订阅了,但是被过滤掉了                

    the consumer group[***] not online

    订阅了,但是消费者未启动                

    SUBSCRIBED_AND_NOT_CONSUME_YET

    订阅了,但是没有消费(Offset小)                

    UNKNOW_EXCEPTION

    未知异常                

      注: SUBSCRIBED_AND_CONSUMED状态,表示消息已被正常消费掉,如果此时有异常,需要业务系统检查日志,分析看看是否因为解析消息时有异常,导致消息未被正确处理。
         SUBSCRIBED_BUT_FILTERD状态,需要业务系统检查初始化Consumer对象时传入的TagList是否和生产者定义的tag匹配。
         SUBSCRIBED_AND_NOT_CONSUME_YET状态,可能的原因是由于消息有积压,消息还未被取走,可以稍等几十秒再去查询一下状态。
         the consumer group[***] not online状态,表示对应CID的消费者还未正确启动。业务系统需要检查消费者是否已启动,如果已启动请检查是否启动时有报错。有可能相关的配置项配置错误,导致consumer启动时校验失败。
         UNKNOW_EXCEPTION表示消息平台有异常,请联系MQCP开发同事。

    13、如何避免接收到的消息是乱码?

         对于生产者来说,建议将消息body转为byte数组时显示指定为UTF-8编码。对于消费者来说,建议在接收到消息后将byte数组转为String时指定UTF-8编码。这样可以避免因为消息body中有中文或者特殊字符,消费端解析时乱码,进而造成消息解析失败。

      

    Example of producer

    MQCPMessage msg = new MQCPMessage();

    msg.setConent("test msg body".getBytes("UTF-8")); // 生产者组装消息body时指定urf-8编码

    Example of consumer

           

    14、MQCP中的消息标签(tag)如何使用?

        在消息中间件实际的使用场景中,消费者只需要消息队列中的部分消息,其余消息希望默认不被接收,直接丢弃掉。针对类似这种场景 ,MQCP提供通过合理使用消息标签(Tag)的方式来实现消费端灵活过滤队列中消息的功能。  
    实现方式如下:
            1、 生产者和消费者双方约定消息标签具体的设置值及其代表的含义。
            2、 生产者在发送消息时,组装消息对象的时候,需要给对应消息设置正确的消息标签。

    MQCPMessage msg = new MQCPMessage();

    //设置过滤标签---大小写敏感

    msg.setTag("SystemTag");

            3、 消费者在组装消费者对象时,需要正确设置消息过滤的过滤器

    MQCPMessageFilter mqcpFilter = new MQCPMessageFilter();

    List<String> list = new ArrayList<String>();

    // 设置tag 

    // tag的作用是过滤消息,如果设置改值,MQCP只会取出发送消息时设置了该tag值的消息

    // 需要注意的是同一CID下的多个应用实例需要设置为同样的tag列表来过滤消息,以保证消息不会被过滤取走但未被业务系统处理list.add("SystemTag");

    mqcpFilter.setTags(list);

       上述生产端、消费端的实现代码具体可查阅MQCP官网提供的Demo。

    16、The consumer’s subscription not exist报错?

         现象:业务系统开发环境报异常:The consumer’s subscription not exist,检查消费者状态,多台实例在线,检查消息消费情况,消息一直不被消费,状态为:SUBSCRIBED_AND_NOT_CONSUME_YET(订阅未消费)。

         原因:业务系统之前已经接入过MQCP,且开发环境有多个开发人员在同时进行开发,多个主机同时连接MQCP,本次该consumer新增了1个订阅关系(subscription-A),但业务系统只有1个同事(小Y)的代码才新增了订阅关系,其他几个开发人员的代码没有配置,一旦他们的consumer启动后,给broker保持通信时发送的订阅关系中,并没有subscription-A,broker会remove该订阅关系,因此小Y的consumer启动后,与broker通信时发现没有subscription-A,就会报:The consumer’s subscription not exist。

        解决方案:1、不同的cid,订阅不同的topic,避免同一个cid订阅多个topic

               2、业务系统开发环境,只开发相关功能的主机才启动consumer

    17、业务系统消费端两种常见的错误实现?

           

           

    18、申请生产环境mq消息查询权限

    请使用IE浏览器,在itsm系统申请对应的权限,通道如下:

    IT权限管理-申请->平安科技_消息协作平台监控应用(MQCP_ADMIN)=>帐户管理组 

    19、测试环境admin平台查询消息已消费,但消费端未查询到消息

       同一个消费者是集群消费模式,在测试环境中,只想在某一个环境测试MQ功能,需要每个环境配置不同的CID。比如某系统测试环境有多套环境:STG1和STG2环境,测试人员正在STG1测试,如果两个环境的CID相同,则消息有可能就被STG2取走,测试人员在STG1上查不到该消息。        

       解决方法:      

       1、若为刚新增的发布订阅关系,请联系MQCP同事申请给每一个环境配置独立的CID,并创建发布订阅关系 

       2、若每个环境都申请了独立的CID,请检查是否其他某个环境使用了该环境的CID

    20、java.lang.reflect.InvocationTargetException告警

        业务系统引用客户端包在1.0.15之前版本在启动时可能会出现该异常,不影响消息的收发,1.0.15的客户端版本修复了该问题

       解决方法:

       更新客户端包为最新的版本,最新包版本可在包库查询。客户端包不断在迭代优化,建议业务系统及时更新客户端包。客户端包每个版本更新内容可在官网“相关文档---CLIENT包版本线”查询。

       需注意:客户端包在1.0.20优化了发布订阅关系的校验方式,配置文件的配置项有变化,从原来的name_server_address、cluster_id 

    、virtual_account变为了:server_address、virtual_account,具体请查阅demo

    21、服务启动时报错:MQCP_CLIENT->Initial cache failed,no cache message received after waiting for 90s

         客户端在重启的时报错初始化发布订阅关系缓存数据失败。 

         解决方法:     

         1、检查是否新增了配置文件:mqcp_client.properties,需新增

         2、检查mqcp_client.properties配置文件,客户端版本为1.0.20之前的版本,需检查name_server_address、cluster_id的值是否正确,前后是否包含空格,检查virtual_account配置的虚拟用户是否与admin平台配置的一致,客户端版本为1.0.15之前的,还需在代码里新增name_server_address的配置:p.setProperty(MQCPConstant.NAME_SERVER_ADDRESS,"不同环境配置不同的nameserver地址" ); 客户端版本为1.0.20及之后的,需要检查server_address、virtual_account是否正确,是否包含空格。     

         3、建议更新客户端包为最新的包。

  • 相关阅读:
    Mac下安装Mysql
    事务隔离级别
    Mysql和Orcale的区别
    springboot的优点
    spring boot注解
    spring boot 常用注解
    java链接Mysql出现警告:Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by
    SpringApplication及banner的配置
    交叉编译
    qt下载和安装记录
  • 原文地址:https://www.cnblogs.com/panxuejun/p/8610540.html
Copyright © 2011-2022 走看看