zoukankan      html  css  js  c++  java
  • springboot之RocketMq实现

    首先,在虚拟机上安装rocketmq和rocketMq可视化控制,安装不做描述。

    1、pom.xml文件添加依赖

    mq的版本与连接的rocketmq版本保持一致

            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-remoting</artifactId>
                <version>4.4.0</version>
            </dependency>

    2、yml文件添加rocketmq配置

    apache:
      rocketmq:
        #消费者的配置
        consumer:
          pushConsumer: myConsumer
        #生产者的配置
        producer:
          producerGroup: myGroup
        namesrvAddr: 192.168.233.128:9876  

    3、生产者类RocketProducer

    package com.zp.springbootdemo.rocketmq;
    
    import com.alibaba.fastjson.JSONObject;
    import com.sun.org.apache.xpath.internal.objects.XString;
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StopWatch;
    
    import javax.annotation.PostConstruct;
    import java.io.UnsupportedEncodingException;
    
    /**
     * @Author zp
     * @Description rocketmq生产者
     * @Date 22:06 2020/5/22
     * @Param
     * @return
     **/
    @Component
    public class RocketProducer {
        /**
         * 生产者的组名
         */
        @Value("${apache.rocketmq.producer.producerGroup}")
        private String producerGroup;
        /**
         * NameServer 地址
         */
        @Value("${apache.rocketmq.namesrvAddr}")
        private String namesrvAddr;
    
        private DefaultMQProducer defaultMQProducer;
    
        @PostConstruct
        public void defaultMQProducer(){
            //生产者的组名
            defaultMQProducer = new DefaultMQProducer(producerGroup);
            defaultMQProducer.setNamesrvAddr(namesrvAddr);
            defaultMQProducer.setVipChannelEnabled(false);
            try {
                defaultMQProducer.start();
                System.out.println("producer启动了。。。");
            } catch (MQClientException e) {
                e.printStackTrace();
            }
        }
    
        public String send(String topic,String tags,String body) throws UnsupportedEncodingException, InterruptedException, RemotingException, MQClientException, MQBrokerException {
            Message message = new Message(topic,tags,body.getBytes(RemotingHelper.DEFAULT_CHARSET));
            StopWatch stop = new StopWatch();
            stop.start();
            SendResult result = defaultMQProducer.send(message);
            System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("msgId",result.getMsgId());
            jsonObject.put("sendStatus",result.getSendStatus());
            stop.stop();
            return jsonObject.toJSONString();
        }
    }

    4、消费者类RocketConsumer

    package com.zp.springbootdemo.rocketmq;

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.CommandCustomHeader;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.stereotype.Component;

    /**
    * @Author zp
    * @Description rocketmq消费者
    * @Date 22:33 2020/5/22
    * @Param
    * @return
    **/
    @Component
    public class RockerConsumer implements CommandLineRunner {
    /**
    * 消费者
    */
    @Value("${apache.rocketmq.consumer.pushConsumer}")
    private String pushConsumer; //myConsumer

    /**
    * NameServer 地址
    */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    /**
    * 初始化RocketMq的监听信息,渠道信息
    */
    public void messageListener(){
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(pushConsumer);
    consumer.setNamesrvAddr(namesrvAddr);

    try {
    // 订阅PushTopic下Tag为push的消息,都订阅消息
    consumer.subscribe("firstTopic","push");
    // 程序第一次启动从消息队列头获取数据
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    //可以修改每次消费消息的数量,默认设置是每次消费一条
    consumer.setConsumeMessageBatchMaxSize(1);

    //在此监听中消费信息,并返回消费的状态信息
    consumer.registerMessageListener((MessageListenerConcurrently) (msgs,context)->{
    // 会把不同的消息分别放置到不同的队列中
    for (Message msg:msgs){
    System.out.println("接收到了消息:"+new String(msg.getBody()));

    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    consumer.start();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    /**
    * Callback used to run the bean.
    *
    * @param args incoming main method arguments
    * @throws Exception on error
    */
    @Override
    public void run(String... args) throws Exception {
    this.messageListener();
    }
    }

    5、controller中编写发送消息

    package com.zp.springbootdemo.rocketmq;
    
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.io.UnsupportedEncodingException;
    
    @RestController
    @RequestMapping("/rocketMq")
    public class MQController {
    
        @Autowired
        private RocketProducer producer;
    
        @RequestMapping("/myFirstProducer")
        public String pushMsg(String msg){
            try {
                System.out.println("======"+msg);
                return producer.send("firstTopic","push",msg);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (RemotingException e) {
                e.printStackTrace();
            } catch (MQClientException e) {
                e.printStackTrace();
            } catch (MQBrokerException e) {
                e.printStackTrace();
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
            return "ERROR";
        }
    }

    6.测试

    请求地址:http://127.0.0.1:8080/rocketMq/myFirstProducer?msg=hello

    响应:{"msgId":"C0A8010E1A3818B4AAC2711E8CD50000","sendStatus":"SEND_OK"}

    通过rocketMq可视化控制查看:

  • 相关阅读:
    spring(2)
    Android之滑动按钮实现Demo
    spring(1)
    spring(4)
    Android之ImageSwitch控件
    使用非阻塞ServerSocketChannel、SocketChannel代替ServerSocket和Socket
    Android之界面刷新(invalidate和postInvalidate使用)
    Android之改变控件的背景及形态
    Android之获得内存剩余大小与总大小
    OpenCV中矩阵的归一化
  • 原文地址:https://www.cnblogs.com/zhangpeng8888/p/12940408.html
Copyright © 2011-2022 走看看