zoukankan      html  css  js  c++  java
  • RocketMQ学习笔记(8)----RocketMQ的Producer API简介

    在RocketMQ中提供了三种发送消息的模式:

      1.NormalProducer(普通)

      2.OrderProducer(顺序)

      3.TransactionProducer(事务)

    下面来介绍一下producer中的各个API的使用:

      1. producerGroup:Producer组名, 默认值为DEFAULT_PRODUCER,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组。

      2. createTopicKey: 默认值为TBW102,在发送消息时,自动创建服务器不存在的topic,需要指定Key。

      3.  defaultTopicQueueNums: 默认值为4, 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数。

      4. sendMsgTimeout: 默认值10000,发送消息超时时间,单位毫秒

      5. compressMsgBodyOverHowmuch: 默认值4096,消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节

      6. retryAnotherBrokerWhenNotStoreOK: 默认值false, 如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重试发送

      7. maxMessageSize: 默认值131072,客户端限制的消息大小,超过报错,同时服务端也会限制

      8. transactionCheckListener: 事务消息回查监听器,如果发送事务消息,必须设置,在DefaultMQProducer的子类TransactionMQProducer中。

      9. checkThreadPoolMinSize:  默认值为1,Broker回查Producer事务状态时,线程池大小,在DefaultMQProducer的子类TransactionMQProducer中。

      10. checkThreadPoolMaxSize: 默认值为1,Broker回查Producer事务状态时,线程池大小。

      11. checkRequestHoldMax: 默认值为2000, Broker回查Producer事务状态时,Producer本地缓冲请求队列大小

      使用设置maxMessageSize,将消息最大值设为1024个字节,运行如下代码:

    package com.wangx.rocketmq.quickstart;
    
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    import java.io.UnsupportedEncodingException;
    
    /**
     * 创建一个消费者
     */
    public class Producer {
    
        public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {
            //1. 实例化一个producer group
            DefaultMQProducer producer = new DefaultMQProducer("my-producer-group");
            //2. 设置namesrvAddr,集群环境多个nameserver用;分割
            producer.setNamesrvAddr("47.105.149.61:9876;47.105.145.123:9876");
         //设置消息最大值 producer.setMaxMessageSize(1024); //3. 启动 producer.start(); String str = ""; // 4. 发送消息 for (int i = 0; i < 1025; i++) { //构建实例,第一个参数为topic,第二个参数为tabs,第三个参数为消息体 str += i; } Message message = new Message("MyQuickStartTopic1","tabA",str.getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult result = producer.send(message); System.out.println(result); //关闭生产者 producer.shutdown(); } }

      控制台将会报如下异常:

    org.apache.rocketmq.client.exception.MQClientException: CODE: 13  DESC: the message body size over max value, MAX: 1024

      

    原文 RocketMQ学习笔记(8)----RocketMQ的Producer API简介

  • 相关阅读:
    mac c++编译出现segmentation fault :11错误
    ssh 连接缓慢解决方法
    237. Delete Node in a Linked List
    203. Remove Linked List Elements
    Inversion of Control Containers and the Dependency Injection pattern
    82. Remove Duplicates from Sorted List II
    83. Remove Duplicates from Sorted List
    SxsTrace
    使用CCleaner卸载chrome
    decimal and double ToString problem
  • 原文地址:https://www.cnblogs.com/xiaoshen666/p/10867603.html
Copyright © 2011-2022 走看看