zoukankan      html  css  js  c++  java
  • SpringBoot2.X 整合 RocketMQ4.X

    开发生产者代码

    第一步:创建很普通的 SpringBoot 项目

    第二步:加入相关依赖

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.3.0</version>
    </dependency>

    第三步:写代码

    PayProducer 类如下所示:

    package net.xdclass.xdclassmq.jms;
    
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.springframework.stereotype.Component;
    
    @Component
    public class PayProducer {
    
        private String producerGroup = "pay_group";
    
        private String nameServerAddr = "192.168.0.104:9876";
    
        private DefaultMQProducer producer;
    
        public PayProducer() {
            producer = new DefaultMQProducer(producerGroup);
    
            //指定NameServer地址,多个地址以 ; 隔开
            //如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876");
    
            producer.setNamesrvAddr(nameServerAddr);
            start();
        }
    
        public DefaultMQProducer getProducer() {
            return this.producer;
        }
    
        /**
         * 对象在使用之前必须要调用一次,只能初始化一次
         */
        public void start() {
            try {
                this.producer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
        }
        
        /**
         * 一般在应用上下文,使用上下文监听器,进行关闭
         */
        public void shutdown() {
            this.producer.shutdown();
        }
    }

    PayController 类如下所示:

    package net.xdclass.xdclassmq.controller;
    
    import net.xdclass.xdclassmq.jms.PayProducer;
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.HashMap;
    
    @RestController
    public class PayController {
    
        @Autowired
        private PayProducer payProducer;
    
        private static  final String topic = "pay_test_topic";
    
        @RequestMapping("/api/v1/pay_cb")
        public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
            Message message = new Message(topic,"taga", ("hello rocketmq = "+text).getBytes() );
            SendResult sendResult = payProducer.getProducer().send(message);
            System.out.println(sendResult);
            return new HashMap<>();
        }
    }

    第四步:测试

    通过可视化管理后台查看消息

    Message对象

    • topic:主题名称
    • tag:标签,用于过滤
    • key:消息唯一标示,可以是业务字段组合
    • body:消息体,字节数组

    注意:发送消息到 Broker 前,需要判断是否有此 Topic。启动 Broker 的时候,本地环境建议开启自动创建 Topic,生产环境建议关闭自动化创建 Topic。建议先手工创建 Topic,如果靠程序自动创建,然后再投递消息,会出现延迟情况。自动创建topic: autoCreateTopicEnable=true 无效原因:客户端版本要和服务端版本保持一致。

    概念模型: 一个 Topic 下面对应多个 Queue,可以在创建 Topic 时指定,如订单类 Topic。

    常见错误一

    org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException:
    sendDefaultImpl call timeout
    
    原因:阿里云存在多网卡,rocketmq都会根据当前网卡选择一个IP使用,当你的机器有多块网卡时,很有可能会有问题。比如,我遇到的问题是我机器上有两个IP,一个公网IP,一个私网IP, 因此需要配置broker.conf 指定当前的公网ip, 然后重新启动broker 
    新增配置:conf/broker.conf  (属性名称brokerIP1=broker所在的公网ip地址 )
    新增这个配置:brokerIP1=120.76.62.13  
    
    启动命令:nohup sh bin/mqbroker -n localhost:9876  -c ./conf/broker.conf &
    

    常见错误二

    MQClientException: No route info of this topic, TopicTest1
    原因:Broker 禁止自动创建 Topic,且用户没有通过手工方式创建 此Topic, 或者broker和Nameserver网络不通
    解决:
    通过 sh bin/mqbroker -m  查看配置
    autoCreateTopicEnable=true 则自动创建topic
    
    Centos7关闭防火墙  systemctl stop firewalld

    常见错误三

    控制台查看不了数据,提示连接 10909错误
    
    原因:Rocket默认开启了VIP通道,VIP通道端口为10911-2=10909
    
    解决:阿里云安全组需要增加一个端口 10909
    

    其他错误:

    https://blog.csdn.net/qq_14853889/article/details/81053145
    https://blog.csdn.net/wangmx1993328/article/details/81588217#%E5%BC%82%E5%B8%B8%E8%AF%B4%E6%98%8E
    https://www.jianshu.com/p/bfd6d849f156
    https://blog.csdn.net/wangmx1993328/article/details/81588217

    开发消费者代码

    接着上面的工程,直接上代码,PayConsumer 类如下所示:

    package net.xdclass.xdclassmq.jms;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.springframework.stereotype.Component;
    
    import java.io.UnsupportedEncodingException;
    import java.util.List;
    
    @Component
    public class PayConsumer {
        private DefaultMQPushConsumer consumer;
    
        private String CONSUMER_GROUP = "pay_consumer_group";
        private String NAME_SERVER = "192.168.0.104:9876";
        private String TOPIC = "pay_test_topic";
    
        public PayConsumer() throws MQClientException {
    
            consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
            consumer.setNamesrvAddr(this.NAME_SERVER);
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    
            consumer.subscribe(this.TOPIC, "*");
    
    //        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    //            try {
    //                Message msg = msgs.get(0);
    //                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
    //                String topic = msg.getTopic();
    //                String body = new String(msg.getBody(), "utf-8");
    //                String tags = msg.getTags();
    //                String keys = msg.getKeys();
    //                System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
    //                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    //            } catch (UnsupportedEncodingException e) {
    //                e.printStackTrace();
    //                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    //            }
    //        });
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    try {
                        Message msg = msgs.get(0);
                        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
    
                        String topic = msg.getTopic();
                        String body = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        String keys = msg.getKeys();
                        System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
    
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    } catch (UnsupportedEncodingException e) {
    
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
            });
    
            consumer.start();
            System.out.println("consumer start ...");
        }
    }

    注释掉的部分采用 Lambda 表达式写法,效果是一样的。

    常见问题

    1、Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.42.1:10911> failed 
    
    2、com.alibaba.rocketmq.client.exception.MQClientException: Send [1] times, still failed, cost [1647]ms, Topic: TopicTest1, BrokersSent: [broker-a, null, null]
    
    3、org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [497]ms, Topic: TopicTest, BrokersSent: [Book-Air.local, 	MacBook-Air.local, MacBook-Air.local]
    解决:多网卡问题处理
    	1、设置producer:  producer.setVipChannelEnabled(false);
    	2、编辑ROCKETMQ 配置文件:broker.conf(下列ip为自己的ip)
    		namesrvAddr = 192.168.0.101:9876
    		brokerIP1 = 192.168.0.101
    
    4、DESC: service not available now, maybe disk full, CL:
    	解决:修改启动脚本runbroker.sh,在里面增加一句话即可:		
    	JAVA_OPT="${JAVA_OPT} -Drocketmq.broker.diskSpaceWarningLevelRatio=0.98"
    	(磁盘保护的百分比设置成98%,只有磁盘空间使用率达到98%时才拒绝接收producer消息)
    	
    常见问题处理
    	https://blog.csdn.net/sqzhao/article/details/54834761
    	https://blog.csdn.net/mayifan0/article/details/67633729
    	https://blog.csdn.net/a906423355/article/details/78192828
  • 相关阅读:
    MySQL体系结构
    简单高效的代码部署方法
    笔试算法题(07):还原后序遍历数组 & 半翻转英文句段
    笔试算法题(06):最大连续子数组和 & 二叉树路径和值
    笔试算法题(05):转换BST为双向链表 & 查找栈中的最小元素
    笔试算法题(04):实现 string & memcpy & strcpy & strlen
    笔试算法题(03):最小第K个数 & 判定BST后序序列
    笔试算法题(02):N阶阶乘 & 双向循环链表实现
    笔试算法题(01):字符串倒置 & 八皇后问题
    chosen选择框加载数据
  • 原文地址:https://www.cnblogs.com/jwen1994/p/12319048.html
Copyright © 2011-2022 走看看