zoukankan      html  css  js  c++  java
  • RabbitMQ 五种工作模式

    官网介绍:https://www.rabbitmq.com/getstarted.html

    五种工作模式的主要特点

    1. 简单模式:一个生产者,一个消费者
    2. work模式:一个生产者,多个消费者,每个消费者获取到的消息唯一(消费者彼此竞争成为接收者)。
    3. 订阅模式:一个生产者发送的消息会被多个消费者获取。
    4. 路由模式:发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key
    5. topic模式:将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,“#”匹配一个词或多个词,“*”只匹配一个词。

    简单模式(一个生产者,一个消费者)

    这种模式下不需要将Exchange进行任何绑定(binding)操作

        public static final String QUEUE_NAME= "myqueue";
    
        public static void test() throws Exception {
            //定义连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            //设置Virtual Host
            factory.setVirtualHost("/ld");
            factory.setUsername("ld");
            factory.setPassword("aaa");
            //通过工厂获取连接
            Connection connection = factory.newConnection();
    
    
            //创建队列,发送消息
            public void producer () {
                //创建通道
                Channel channel = connection.createChannel();
                //声明创建队列
                /** 
                    队列名
                    是否持久化
                    是否排外  即只允许该channel访问该队列   一般等于true的话用于一个队列只能有一个消费者来消费的场景
                    是否自动删除(自动删除的前提是: 至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会 自动删除。)
                    其他属性
                */
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                //消息内容
                String message = "Hello World!";
                //发布消息
                /**
                    交换机
                    队列名
                    其他属性  路由
                    消息body
                */
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                //关闭连接和通道
                channel.close();
                connection.close();
            }
            //消费者消费消息
            public void consumer () {
                //创建通道
                Channel channel = connection.createChannel();
                //声明通道
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                //定义消费者
                QueueingConsumer consumer = new QueueingConsumer(channel);
                //监听队列
                //autoAck 是否自动确认消息,true自动确认
                channel.basicConsume(QUEUE_NAME, true, consumer);
                while (true) {
                    //这个方法会阻塞住,直到获取到消息
                    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                    String message = new String(delivery.getBody());
                    System.out.println("接收到消息:" + message);
                }
            }
        }
    

    work模式(一个生产者,一个队列,多个消费者,每个消费者获取到的消息唯一)

        public static final String QUEUE_NAME= "myqueue";
    
        //消息生产者
        public void producer{
            //获取连接和通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            //声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "";
            for (int i = 0; i < 100; i++) {
                message = "" + i;
                channel.basicPublish("",QUEUE_NAME, null, message.getBytes());
                Thread.sleep(i);
            }
            channel.close();
            connection.close();
        }
    
    
        //消费者1
        public void consumer1{
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //同一时刻服务器只发送一条消息给消费端
            channel.basicQos(1);
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //false:手动确认
            channel.basicConsume(QUEUE_NAME, false, consumer);
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("recive1:" + message);
                Thread.sleep(100);
                //消息消费完给服务器返回确认状态,表示该消息已被消费
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    

    channel.basicPublish

    channel.basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
    
    mandatory:
        true:如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,
        那么会调用basic.return方法将消息返还给生产者。
    
        false:出现上述情形broker会直接将消息扔掉
    
    immediate:
        true:如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。
        当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
    
    BasicProperties :
        需要注意的是BasicProperties.deliveryMode:
            0:不持久化 1:持久化 
        这里指的是消息的持久化,配合channel(durable=true),queue(durable)可以实现,即使服务器宕机,消息仍然保留
    

    fanout订阅模式(一个生产者,多个队列,多个消费者)

    这种模式需要提前将Exchange与Queue进行绑定,
    一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。

    ​ 一个生产者发送的消息会被多个消费者获取。
    
    生产者:可以将消息发送到队列或者是交换机。
    消费者:只能从队列中获取消息。
    
    如果消息发送到没有队列绑定的交换机上,那么消息将丢失。
    
        public static final String EXCHANGE_NAME = "exchange_fanout";
    
        //生产者
        public void producer() {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            //声明交换机 fanout
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    
            String message = "Hello World!";
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
    
            channel.close();
            connection.close();
        }
    
    
        //消费者1
        public final static String QUEUE_NAME = "queue_fanout_1";
        public void consumer1() {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //绑定队列到交换机上
            channel.queueBind(QUEUE_NAME, Send.EXCHANGE_NAME, "");
            channel.basicQos(1);
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(QUEUE_NAME, true, consumer);
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(message);
            }
        }
    

    direct路由模式(完全匹配、单播的模式)

    1. 发送消息到交换机并且要指定路由key
    
    2. 消费者将队列绑定到交换机时需要指定路由key
    
    3. 完全匹配,只有匹配到的消费者才能消费消息
    
    4. 一个队列可以绑定多个路由
    
        public static final String EXCHANGE_NAME = "exchange_direct";
    
        //生产者
        public void producer() {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            //声明交换机 direct
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            String message = "Hello World!";
            channel.basicPublish(EXCHANGE_NAME, "key", null, message.getBytes()); 
            channel.close();
            connection.close();
        }
    
        //消费者1
        public final static String QUEUE_NAME = "queue_direct_1";
    
        public void consumer1() {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //绑定队列到交换机上,并制定路由键为"key"
            channel.queueBind(QUEUE_NAME, com.bw.rabbitmq.routing.Send.EXCHANGE_NAME, "key");
            channel.basicQos(1);
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(QUEUE_NAME, true, consumer);
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(message);
            }
        }
    

    topic通配符模式

    两个通配符:符号“#”和符号“*”。#匹配0个或多个单词,*匹配一个单词
    
        //生产者
        public static final String EXCHANGE_NAME = "exchange_topic";
    
        public void producer() {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            //声明交换机 topic:交换机类型
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            String message = "Hello World!";
            channel.basicPublish(EXCHANGE_NAME, "key.1", null, message.getBytes());
            System.out.println(message);
            channel.close();
            connection.close();
        }
    
        //消费者1
        public final static String QUEUE_NAME = "queue_topic_1";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //绑定队列到交换机上,并制定路由键匹配规则为"key.*"
            channel.queueBind(QUEUE_NAME, com.bw.rabbitmq.topics.Send.EXCHANGE_NAME, "key.*");
            channel.basicQos(1);
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(QUEUE_NAME, true, consumer);
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(message);
            }
        }
    
  • 相关阅读:
    TCP源码—连接建立
    TCP系列02—连接管理—1、三次握手与四次挥手
    TCP系列01—概述及协议头格式
    ubuntu软件管理apt与dpkg
    318. Maximum Product of Word Lengths
    317. Shortest Distance from All Buildings
    316. Remove Duplicate Letters
    315. Count of Smaller Numbers After Self
    314. Binary Tree Vertical Order Traversal
    313. Super Ugly Number
  • 原文地址:https://www.cnblogs.com/loveer/p/11423173.html
Copyright © 2011-2022 走看看