zoukankan      html  css  js  c++  java
  • RabbitMQ系列之---初识RabbitMQ

    为什么要使用RabbitMQ?

    消息队列的作用

    • 异步调用
    • 系统解耦
    • 削峰限流
    • 消息通讯

    消息队列的缺点

    • 系统可用性降低
    • 系统稳定性降低
    • 分布式一致性问题(可靠消息最终一致性的分布式事务方案解决)

    RabbitMQ的优势

    • 支持高并发、高吞吐、性能好
    • 有完善的后台管理界面
    • 它还支持集群化、高可用部署架构、消息高可靠支持
    • RabbitMQ的开源社区很活跃,较高频率的迭代版本,来修复发现的bug以及进行各种优化
    • 最重要的是它是开源免费的。

    RabbitMQ的缺点

    • 它是基于erlang语言开发的,所以导致较为难以分析里面的源码,也较难进行深层次的源码定制和改造,毕竟需要较为扎实的erlang语言功底才可以。

    核心概念

    • Server:又称Broker,接受客户端的连接,实现AMQP实体服务;
    • Connection:连接,应用程序与Broker的网络连接;
    • Channel:网络通道,也称信道,几乎所有的通道都在Channel中进行的,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务;是建立在“真实的”TCP连接内的虚拟连接。AMQP命令都是通过信道发送出去的。那么我们为什么需要信道呢?为什么不直接通过TCP连接发送AMQP命令呢?因为操作系统建立和销毁TCP会话是非常昂贵的开销,而且操作系统只能建立数量不多的TCP连接,很快就达到性能瓶颈,无法满足高性能的需求。
    • Message:消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、等高级特性;Body则就是消息体的内容;
    • Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由,并非物理概念。一个Virtual Host里面有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或者Queue;
    • Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列;
      交换机的类型:direct:直连,fanout:广播,headers:以...开头,topic:主题
      参考:Spring Boot RabbitMQ 四种交换器
    • Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key;
    • Routing Key:一个路由规则,虚拟机可用它来确定如何路由一个特定的消息;
    • Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者。

    几种交换机

    • Direct:直连交换机
      RabbitMQ-direct交换机.jpg-13.4kB
      所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue
      注意:Direct模式可以直接使用RabbitMQ自带的Exchange:default,并以队列名作为路由键。 Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃。
    public class Producer4DirectExchange {
        public static void main(String[] args) throws IOException, TimeoutException {
            ...
            Channel channel = connection.createChannel();
            
            String exchangeName = "test_direct_exchange";
            String routingKey = "test.direct";
            String msg = "Hello World RabbitMQ 4 Direct Exchange Message...";
    
            channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
            ...
        }
    }
    
    public class Consumer4DirectExchange {
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
           ...
            Channel channel = connection.createChannel();
    
            String exchangeName = "test_direct_exchange";
            String exchangeType = "direct";
            String queueName = "test_direct_queue";
            String routingKey = "test.direct";
            
            // 声明交换机
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            // 声明队列
            channel.queueDeclare(queueName, false, false, false, null);
            // 交换机与队列绑定
            channel.queueBind(queueName, exchangeName, routingKey);
    
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    
            channel.basicConsume(queueName,true,queueingConsumer);
    
            while (true) {
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.out.println("收到消息: " + msg);
            }
        }
    }
    
    • Topic:主题交换机
      RabbitMQ-topic交换机.jpg-16.4kB
      所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定的Queue上,Exchange将RouteKey和某个Topic进行模糊匹配,此时队列需要绑定一个Topic。
      注意:可以使用通配符进行模糊匹配
    "#" 匹配一个或多个词
    "*" 匹配一个词
    "." 是单词的分隔符
    
    例如:
    "log.#":能够匹配到"log.info.oa
    "log.*":只会匹配到"log.error"
    
    public class Producer4TopicExchange {
        public static void main(String[] args) throws IOException, TimeoutException {
            ...
            Channel channel = connection.createChannel();
    
            String exchangeName = "test_topic_exchange";
            String routingKey1 = "user.save";
            String routingKey2 = "user.update";
            String routingKey3 = "user.delete.ok";
    
            String msg = "Hello World RabbitMQ 4 Topic Exchange Message...";
    
            channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
            channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
            channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());
            ...
        }
    }
    
    public class Consumer4TopicExchange {
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            ...
            Channel channel = connection.createChannel();
    
            String exchangeName = "test_topic_exchange";
            String exchangeType = "topic";
            String queueName = "test_topic_queue";
            String routingKey = "user.*";
    
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);
            channel.queueDeclare(queueName, false, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
    
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    
            channel.basicConsume(queueName, true, queueingConsumer);
    
            while (true) {
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.out.println("收到消息: " + msg);
            }
        }
    }
    
    • Fanout Exchange:广播交换机
      RabbitMQ-fanout交换机.jpg-17kB
      不处理路由键,只需要简单的将队列绑定到交换机;
      发送到交换机的消息都会被转发到与该交换机绑定的所有队列上;
      Fanout交换机转发消息是最快的。
    public class Producer4FanoutExchange {
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            ...
            Channel channel = connection.createChannel();
    
            String exchangeName = "test_fanout_exchange";
            String routingKey = "";
    
            String msg = "Hello World RabbitMQ 4 Fanout Exchange Message...";
    
            channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
            ...
        }
    }
    
    public class Consumer4FanoutExchange {
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            ...
            Channel channel = connection.createChannel();
    
            String exchangeName = "test_fanout_exchange";
            String exchangeType = "fanout";
            String queueName = "test_fanout_queue";
            String routingKey = "";
    
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);
            channel.queueDeclare(queueName, false, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
    
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    
            channel.basicConsume(queueName, true, queueingConsumer);
    
            while (true) {
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.out.println("收到消息: " + msg);
            }
        }
    }
    

    参考资料:

    1. 石杉大神消息中间件系列文章
    2. 慕课网《RabbitMQ消息中间件技术精讲》
    3. 书《RabbitMQ实战-高效部署分布式消息队列》
  • 相关阅读:
    surfaceView和Camera配合进行摄像头的预览
    Android中SurfaceView的使用详解
    Java 图片与byte数组互相转换
    Android的GridView和Gallery结合Demo
    AlertDialog dismiss 和 cancel方法的区别
    HttpClient4.0
    IntentFilter
    ViewPagerindicator 源码解析
    android背景平铺方式 tileMode
    android requestWindowFeature使用详解
  • 原文地址:https://www.cnblogs.com/-Marksman/p/10838693.html
Copyright © 2011-2022 走看看