zoukankan      html  css  js  c++  java
  • Rabbitmq基本使用 SpringBoot整合Rabbit SpringCloud Stream+Rabbit

    https://blog.csdn.net/m0_37867405/article/details/80793601

    四、docker中使用rabbitmq

    1. 搭建和启动

    使用地址:rabbitmq docker

    #1. 拉去rabbitmq的镜像
    docker pull hub.c.163.com/library/rabbitmq:3.6.11-management
    #2. 由于rabbitmq远程访问是不允许guest的,所以启动时候需要设置一个用户名和密码
    docker run -d --hostname my-rabbit --name some-rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 hub.c.163.com/library/rabbitmq:3.6.11-management
    # 提示信息:d525b1c1004f50284ca5bab76e8e5e2fea55462b72d9f923cea0da1a29e9aa9dnetstat
    • 1
    • 2
    • 3
    • 4
    • 5

    在浏览器中访问:192.168.186.135:15672然后输入用户名和密码

    2. java Hello world

    简单的一对一生产消息

    这里写图片描述

    官网地址示例

    1.引入 pom.xml

    <dependency>
          <groupId>com.rabbitmq</groupId>
          <artifactId>amqp-client</artifactId>
          <version>5.0.0</version>
    </dependency>
    • 1
    • 2
    • 3
    • 4
    • 5

    2.消息提供者

    package com.itcloud.concurrency.rabbitmq;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class MessageProvider {
    
        private static final String HOST = "192.168.186.135";
    
        private static final int PORT = 5672; // rabbitmq端口是5672
    
        private static final String USERNAME = "admin";
    
        private static final String PASSWORD = "admin";
    
        private static final String QUEUE_NAME = "hello";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
    
            factory.setHost(HOST);
            factory.setPort(PORT);
            factory.setUsername(USERNAME);
            factory.setPassword(PASSWORD);
    
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "hello world";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" 生产消息:'" + message + "'");
    
        }
    
    }

    3.消费端

    package com.itcloud.concurrency.rabbitmq;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class MessageConsumer {
    
        private static final String HOST = "192.168.186.135";
    
        private static final int PORT = 5672; // rabbitmq端口是5672
    
        private static final String USERNAME = "admin";
    
        private static final String PASSWORD = "admin";
    
        private static final String QUEUE_NAME = "hello";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(HOST);
            factory.setPort(PORT);
            factory.setUsername(USERNAME);
            factory.setPassword(PASSWORD);
    
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            Consumer consumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("消费端接收消息:" + message);
                }
    
            };
            //true  异步接收消息
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }

    4.先后启动生产者和消费者

    3. 一对多

    官网示例

    这里写图片描述

    4. SpringBoot整合RabbitMQ

    第一种交换模式:Direct

    1.引入依赖

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>

    application.yml

    spring:
      rabbitmq:
        host: 192.168.186.135
        port: 5672
        username: admin
        password: admin

    2.配置org.springframework.amqp.core.Queue

    ​ 【MQConfig.java】

    @Configuration
    public class MQConfig {
    
        public static final String QUEUE = "queue";
    
        @Bean
        public Queue queue() {
            return new Queue(QUEUE);
        }
    
    }
    1. 【MQSender.java】消息发送端
    @Component
    @Slf4j
    public class MQSender {
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        public void sendMsg(String msg) {
            log.info("发送消息:" + msg);
            amqpTemplate.convertAndSend(MQConfig.QUEUE, msg);
        }
    
    }

    4.【MQReceiver.java】消息接收端

    @Component
    @Slf4j
    public class MQReceiver {
    
        //绑定队列名称
        @RabbitListener(queues = { MQConfig.QUEUE })
        public void receive(String message) {
            log.info("springboot rabbitmq recevie message:" + message);
        }

    5.简单的controller测试

        @PostMapping("/send")
        public String sendMsg(String msg) {
            mqSender.sendMsg(msg);
            return "success";
        }

    第二种交换模式:Topic

    :biking_woman:特点:

    :ballot_box_with_check: 可以根据routing_key自由的绑定不同的队列

    :ballot_box_with_check:发送端不需要知道发送到哪个队列,由routing_key去分发到队列中

    2.配置org.springframework.amqp.core.Queue

    ​ 【MQConfig.java】

        public static final String TOPIC_EXCHANGE = "topic_exchange";
    
        public static final String QUEUE_HELLO = "topic_hello";
    
        public static final String QUEUE_WORLD = "topic_world";
    
        public static final String ROUT_HELLO = "hello_key";
        public static final String ROUT_WORLD = "world_key";
        ...
        //=============定义两个队列========================    
        @Bean
        public Queue queueHello() {
            return new Queue(QUEUE_HELLO);
        }
    
        @Bean
        public Queue queueWorld() {
            return new Queue(QUEUE_WORLD);
        }
        //===================声明topinc交换模式=========================
        @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange(TOPIC_EXCHANGE);
    
        }
        //队列绑定到交换模式上
        @Bean
        @Autowired
        public Binding bindingHello(Queue queueHello, TopicExchange topicExchange) {
            return BindingBuilder.bind(queueHello).to(topicExchange).with(ROUT_HELLO);
        }
    
        @Bean
        @Autowired
        public Binding bindingWorld(Queue queueWorld, TopicExchange topicExchange) {
            return BindingBuilder.bind(queueWorld).to(topicExchange).with(ROUT_WORLD);
        }
    1. 【MQSender.java】消息发送端
        public void sendHello(String msg) {
            log.info("hello topic send messsgae:" + msg);
            amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, MQConfig.ROUT_HELLO, msg);
        }
    
        public void sendWorld(String msg) {
            log.info("world topic send messsgae:" + msg);
            amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, MQConfig.ROUT_WORLD, msg);
        }

    4.【MQReceiver.java】消息接收端

        @RabbitListener(queues = MQConfig.QUEUE_HELLO)
        public void receiveHello(String msg) {
            log.info("springboot rabbitmq recevie message topicHello:" + msg);
        }
    
        @RabbitListener(queues = MQConfig.QUEUE_WORLD)
        public void receiveWorld(String msg) {
            log.info("springboot rabbitmq recevie message topicWorld:" + msg);
        }

    5.简单的controller测试

        @PostMapping("/send/topic/hello")
        public String sendHello(String msg) {
            mqSender.sendHello(msg);
            return "success";
        }
    
        @PostMapping("/send/topic/world")
        public String sendWorld(String msg) {
            mqSender.sendWorld(msg);
            return "success";
        }

    第三种交换:Fanout

    :biking_woman: 特点:只要绑定该交换机的消费者都可以接受到消息

    1.配置org.springframework.amqp.core.Queue

    ​ 【MQConfig.java】

        public static final String FANOUT_EXCHANGE = "fanout_exchange";
        public static final String QUEUE_FANOUT = "queue_fanout";
        public static final String QUEUE_FANOUT2 = "queue_fanout2";
    ....
        @Bean
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange(FANOUT_EXCHANGE);
        }
    
        @Bean
        public Queue queueFanout() {
            return new Queue(QUEUE_FANOUT);
        }
    
        @Bean
        public Queue queueFanout2() {
            return new Queue(QUEUE_FANOUT2);
        }
    
        @Bean
        @Autowired
        public Binding bindingFanout(FanoutExchange fanoutExchange, Queue queueFanout) {
            return BindingBuilder.bind(queueFanout).to(fanoutExchange);
        }
    
        @Bean
        @Autowired
        public Binding bindingFanout2(FanoutExchange fanoutExchange, Queue queueFanout2) {
            return BindingBuilder.bind(queueFanout2).to(fanoutExchange);
        }   

    2.【MQSender.java】消息发送端

        public void sendFanout(String msg) {
            log.info("fanout send messsgae:" + msg);
            amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE, "", msg);
            amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE, "", "自定义消息");
        }

    3.【MQReceiver.java】消息接收端

        @RabbitListener(queues = MQConfig.QUEUE_FANOUT)
        public void receiveFanout(String msg) {
            log.info("springboot rabbitmq recevie message fanout:" + msg);
        }
    
        @RabbitListener(queues = MQConfig.QUEUE_FANOUT2)
        public void receiveFanout2(String msg) {
            log.info("springboot rabbitmq recevie message fanout2:" + msg);
        }

    第四种交换模式:Headers

    1.【MQConfig.java】

    //======================headers交换模式============================
    
        public static final String HEADERS_EXCHANGE = "headers_exchange";
    
        public static final String QUEUE_HEADERS = "queue_headers";
    
        @Bean
        public HeadersExchange headersExchange() {
            return new HeadersExchange(HEADERS_EXCHANGE);
        }
    
        @Bean
        public Queue headersQueue() {
            return new Queue(QUEUE_HEADERS, true);
        }
    
        @Bean
        @Autowired
        public Binding headersBind(HeadersExchange headersExchange, Queue headersQueue ) {
            Map<String, Object> headers = new HashMap<>();
            headers.put("key1", "value1");
            headers.put("key2", "value2");
            return BindingBuilder.bind(headersQueue).to(headersExchange).whereAny(headers).match();

    2.【MQSender.java】消息发送端

        public void sendHeaders(String msg) {
            log.info("headers send messsgae:" + msg);
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setHeader("key1", "value1");
            messageProperties.setHeader("key3", "value2");
            Message message = new Message(msg.getBytes(), messageProperties);
            amqpTemplate.convertAndSend(MQConfig.HEADERS_EXCHANGE, "", message);
        }

    3.【MQReceiver.java】消息接收端

        @RabbitListener(queues = MQConfig.QUEUE_HEADERS)
        public void receiveHeaders(byte [] bytes) {
            log.info("springboot rabbitmq recevie message headers:" + new String(bytes));
        }

    参考博客:

    博客1 博客2

    6. SpringCloud Stream

    1. 创建生产者和消费者

    1. 消息生产者:

    新建【stream】springcloud项目

    pom.xml

    ​ 其他依赖略

            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
    //注意导包
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.integration.support.MessageBuilder;
    import org.springframework.messaging.MessageChannel;
    
    
    //消息发送端接口
    public interface SendMsg {
    
        public void send(Object obj);
    
    }
    
    //接口实现类
    @EnableBinding(Source.class)
    public class SendMsgImpl implements SendMsg {
    
        @Autowired
        private MessageChannel output;
    
        @Override
        public void send(Object obj) {
            this.output.send(MessageBuilder.withPayload(obj).build());
        }
    
    }

    【application.yml】文件

    server:
      port: 8083
    spring:
      cloud:
        stream:
          binders: 
            defaultRabbit: 
              type: rabbit
              environment: #配置rabbimq连接环境
                spring: 
                  rabbitmq:
                    host: 192.168.186.135
                    username: admin
                    password: admin
                    virtual-host: / 
          bindings: 
            output:
             destination: myExchange  #exchange名称,交换模式默认是topic
             content-type: application/json
             binder: defaultRabbit
    //测试使用的Controller
    @RestController
    public class SendController {
    
        @Autowired
        private SendMsg sendMsg;
    
        @GetMapping("/send")
        public String  msgSend(String msg) {
            this.sendMsg.send(msg);
            return "success";
        }
    }

    2 .消息消费者

    新建**【input】**springlcloud项目

    //接口
    public interface ReceiveMsg {
    
        public void receive(Message<Object> message);
    
    }
    //实现类
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.messaging.Message;
    import org.springframework.stereotype.Component;
    
    @Component
    @EnableBinding(Sink.class)
    public class ReceiveMsgImpl implements ReceiveMsg {
    
        @Override
        @StreamListener(Sink.INPUT) 
        public void receive(Message<Object> message) {
            System.out.println("接收消息" + message.getPayload());
        }
    }

    application.yml

    server:
      port: 8085
    
    spring:
      cloud:
        stream:
          binders:
            defaultRabbit: 
              type: rabbit
              environment:
                spring: 
                  rabbitmq:
                    host: 192.168.186.135
                    username: admin
                    password: admin
                    virtual-host: / 
          bindings: 
            input:
             destination: myExchange
             content-type: application/json
             binder: defaultRabbit
  • 相关阅读:
    nginx第三方模块---nginx-sticky-module的使用(基于cookie的会话保持)
    通过redis的monitor命令排除故障
    redis数据过期策略【转】
    PHP通用分页类page.php[仿google分页]
    简洁php的MVC框架
    Jquery插件开发之图片放大镜效果(仿淘宝)
    PHPCMS V9标签循环嵌套调用数据的方法
    虚拟主机服务器php fsockopen函数被禁用的解决方法
    PHPCMS V9 fsockopen 函数被禁用解决方案
    PHP IN_ARRAY 函数 使用需要注意的地方
  • 原文地址:https://www.cnblogs.com/lykbk/p/etwete23424234324.html
Copyright © 2011-2022 走看看