zoukankan      html  css  js  c++  java
  • RocketMQ学习笔记(8)----RocketMQ的Producer API简介

    在RocketMQ中提供了三种发送消息的模式:

      1.NormalProducer(普通)

      2.OrderProducer(顺序)

      3.TransactionProducer(事务)

    下面来介绍一下producer中的各个API的使用:

      1. producerGroup:Producer组名, 默认值为DEFAULT_PRODUCER,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组。

      2. createTopicKey: 默认值为TBW102,在发送消息时,自动创建服务器不存在的topic,需要指定Key。

      3.  defaultTopicQueueNums: 默认值为4, 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数。

      4. sendMsgTimeout: 默认值10000,发送消息超时时间,单位毫秒

      5. compressMsgBodyOverHowmuch: 默认值4096,消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节

      6. retryAnotherBrokerWhenNotStoreOK: 默认值false, 如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重试发送

      7. maxMessageSize: 默认值131072,客户端限制的消息大小,超过报错,同时服务端也会限制

      8. transactionCheckListener: 事务消息回查监听器,如果发送事务消息,必须设置,在DefaultMQProducer的子类TransactionMQProducer中。

      9. checkThreadPoolMinSize:  默认值为1,Broker回查Producer事务状态时,线程池大小,在DefaultMQProducer的子类TransactionMQProducer中。

      10. checkThreadPoolMaxSize: 默认值为1,Broker回查Producer事务状态时,线程池大小。

      11. checkRequestHoldMax: 默认值为2000, Broker回查Producer事务状态时,Producer本地缓冲请求队列大小

      使用设置maxMessageSize,将消息最大值设为1024个字节,运行如下代码:

    package com.wangx.rocketmq.quickstart;
    
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    import java.io.UnsupportedEncodingException;
    
    /**
     * 创建一个消费者
     */
    public class Producer {
    
        public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {
            //1. 实例化一个producer group
            DefaultMQProducer producer = new DefaultMQProducer("my-producer-group");
            //2. 设置namesrvAddr,集群环境多个nameserver用;分割
            producer.setNamesrvAddr("47.105.149.61:9876;47.105.145.123:9876");
         //设置消息最大值 producer.setMaxMessageSize(
    1024); //3. 启动 producer.start(); String str = ""; // 4. 发送消息 for (int i = 0; i < 1025; i++) { //构建实例,第一个参数为topic,第二个参数为tabs,第三个参数为消息体 str += i; } Message message = new Message("MyQuickStartTopic1","tabA",str.getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult result = producer.send(message); System.out.println(result); //关闭生产者 producer.shutdown(); } }

      控制台将会报如下异常:

    org.apache.rocketmq.client.exception.MQClientException: CODE: 13  DESC: the message body size over max value, MAX: 1024

      

  • 相关阅读:
    升级salt导致进程kill问题记录
    记一次centos6升级salt-minion启动失败的问题
    3.ElasticSearch的倒排索引
    4.ElasticSearch的基本api操作
    1.ElasticSearch介绍及基本概念
    10.openldap备份与恢复
    1.Python3.6环境部署
    8.openldap mirrormode(主主同步)
    7.openldap使用ssl加密认证
    Python json解析
  • 原文地址:https://www.cnblogs.com/Eternally-dream/p/9954704.html
Copyright © 2011-2022 走看看