zoukankan      html  css  js  c++  java
  • RabbitMQ 入门

    RabbitMQ作为应用程序之间通信的工具,越来越受欢迎,下面结合介绍一下RabbitMQ中一些简单的概念。建议初学者可以看一下RabbitMQ官方教程和官方在GitHub上提供的样例代码

    几个重要概念

    生产者

    生产者也叫客户端,不是RabbitMQ的一部分,他创建消息,并将消息发送到给消息代理RabbitMQ。消息主要分为两部分:有效载荷和标签。有效载荷为要传输的数据,标签用于标明一个交换器(下面会讲)的名称和消息的主题。

    消费者

    消费者也叫服务端,同样不是RabbitMQ的一部分,消费者订阅到队列上,消费消息。

    信道

    首先必须连接到RabbitMQ才能发布或消费消息。那么就需要应用程序和Rabbit之间建立一条TCP连接。一旦TCP连接打开,就建立了一条信道。但是频繁的建立和断开TCP连接会对系统造成很重的负担而且操作系统每秒建立的TCP连接是有限的。所以RabbitMQ中的信道是建立在TCP上的虚连接。RabbitMQ为每个信道指派一个ID,发布,订阅和接收消息都是在指定的信道上完成的。

    队列

    RabbitMQ 主要包含三个部分:交换器,队列和绑定。生产者吧消息发布到交换器上(注意是交换器,不是队列),消息最终到达队列(期间有类似路由的过程),并被消费者接收。

    交换器

    交换器类似网络中的路由器,队列通过路由键绑定到交换器,RabbitMQ 根据之前发送消息时定义的路由键——route key,确定将消息投递到哪个队列。如果路由的消息不匹配所有绑定的路由键,消息将会被丢弃。交换器主要有headers,direct,fanout,topic四种。

    headers

    headers交换器时使用的比较少的一种交换器,他是一种忽略route key使用headers的键值对匹配。

    direct

    direct交换器非常简单,如果路由键匹配的话,消息就被投递到对应的队列。使用方法有两种,一种是单一绑定,另一种是多绑定。

    单一绑定

    意思是,不同的队列绑定到同一个交换器上。这样当不同route key 的消息到达交换器时,交换器根据不同的route key 将消息放到不同的队列中。

    多绑定

    将两个不同的队列用相同的route key 绑定在同一个交换器上,这是交换器会向具有相同的route key的队列上广播匹配route key 的消息。这种设置的作用在fanout交换器中介绍。

    fanout

    和 direct的多绑定相同,它会将受到的消息广播给所有绑定的队列上。这用交换器的作用是允许对单条消息作出不同的反应。例如,当你在淘宝下一个订单时,首先淘宝要告诉银行向商家转一笔钱,同时还要告诉商家要准备商品,并发货。在这种情况下,一个下订单的消息发送到RabbitMQ中后,需要作出两个反应:转账和准备商品。而fanout模式的很好的解决了这个问题。rabbit将消息分别发送到银行和商家绑定的两个队列就可完成这个功能。

    topic

    topic,顾名思义就是根据队列在绑定到交换器时,定义了自己感兴趣的话题。他的route key 支持正则匹配方式。例如"stock.used.nyse","nyse.vmw"。你可以根据喜好在route key 中包含多个用“点”隔开的单词(最多255个)。

    • *(star) 表示只替代一个单词,例如"chris.*",可以匹配"chris.name" 但不可以匹配"chris" 或者"chris.age.eight"
    • #(hash) 表示可以匹配0个或多个单词,例如"chris.#",可以匹配"chris",也可以匹配"chris.age.eight",还可以匹配"chris.name"

    绑定

    绑定的意思就是将队列绑定到交换器上,当消息到达交换器后,会根据不同交换器的类型,以及在绑定时定义的route key,将消息分发到不同的队列中取。在java API中,代码如下:

    // only care about the message with bindingKey
    channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); 
    

    代码样例

    下面用RabbitMQ官方提供的代码(文章开头的GitHub)来解释一下Java API的使用。

    • 消息发送者(生产者)
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        // 创建连接
        Connection connection = factory.newConnection();
        // 在连接上创建信道
        Channel channel = connection.createChannel();
        // 声明交换器参数意义:交换器名称,交换器类型
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
        // 声明队列 参数的意义依次是:
        // 队列名称: 在往交换器上绑定时使用
        // durable: 如果为true的话,队列中的消息在关闭rabbit服务时会持久化,
        // exclusive: 设置为true的话,队列为私有的只有当前程序才能消费队列消息
        // autoDelete 当最后一个消费者取消订阅是,队列自动删除
        // arguments: 队列的其他属性
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "Hello World!";
        // 发布消息 参数意义如下:
        // exchange: 消息发布到那个交换器上
        // route key: 路由键,exchange 根据其路由消息
        // props: 消息的其他属性,headers 会用到
        // body: 消息内容
        channel.basicPublish(EXCHANGE_NAME, QUEUE_NAME, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");
    
        channel.close();
        connection.close();
    
    • 消息接收者(消费者)
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 消费者端的代码不需要声明交换器
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        // 创建消费者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                        AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        // 队列需要有消费者,消息发送者不需要
        // 第二个参数为auto_ack: 设置为true时,一旦消费者接收了消息,RabbitMQ会自动视为其确认,如果确认了消息将从队列中移除。
        channel.basicConsume(QUEUE_NAME, true, consumer);
    
    I am chris, and what about you?
  • 相关阅读:
    iOS
    iOS
    iOS
    iOS
    iOS(WKWebView)OC与JS交互 之三
    iOS(UIWebView 和WKWebView)OC与JS交互 之二
    CentOS VMware 下SSH配置方法详解
    15个nosql数据库
    MySql 优化
    Elasticsearch 相关名词理解
  • 原文地址:https://www.cnblogs.com/arax/p/9130606.html
Copyright © 2011-2022 走看看