zoukankan      html  css  js  c++  java
  • RabbitMq初探——发布与订阅

    publish and subscribe

    前言


    前面的例子 我们都是用到的都是消息单一消费,即一条消息被单个消费者消费。像微博系统的消息推送,是一条消息推送给所有订阅到该频道的用户。

    这里我们就需要用到rabbitmq的发布与订阅(publish and subscribe)

    原理


    前面我们弱化rabbitmq,只抽象出了 生产者、队列、消费者三个概念。

    现在需要介绍rabbitmq的整体数据流转过程。

    数据由生产者发送给交换机,交换机接收数据并把它发送给与自己绑定好的队列,队列接收消息并且把它发送给消费者。

    事实上,生产者根本不会知道消息是发送给谁的,也不需要关心。who cares?!

     exchange的类型

    移步 RabbitMQ各种交换机类型Exchange Types介绍

    发布与订阅

    一个中心生产者,多个消费者。

    生产者生产消息给类型为fanout的exchange,多个queue与该exchange绑定,消费者从queue中获取消息。

    代码


     

    1. 生产者、消费者声明类型为fanout、名称为logs的交换机

    2. 消费者进程声明名称随机的queue(用于每new 一个进程就会产生一个队列),将queue与logs exchange绑定

    整体代码如下

    fanout_sender.php

    <?php
    /**
     * Created by PhpStorm.
     * User: wangdaxi
     * Date: 2017/10/20
     * Time: 14:20
     */
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    
    $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    
    $channel->exchange_declare('logs', 'fanout', false, false, false);
    
    $data = implode(' ', array_slice($argv, 1));
    empty($data) && $data = 'Hello World';
    $msg = new AMQPMessage($data);
    
    $channel->basic_publish($msg, 'logs');
    echo " [x] Sent $data 
    ";
    
    $channel->close();
    $connection->close();

    fanout_receive.php

    <?php
    /**
     * Created by PhpStorm.
     * User: wangdaxi
     * Date: 2017/10/20
     * Time: 14:32
     */
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    
    $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    
    $channel->exchange_declare('logs', 'fanout', false, false, false);
    
    list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
    
    $channel->queue_bind($queue_name, 'logs');
    
    echo ' [*] Waiting for logs. To exit press CTRL+C', "
    ";
    
    $callback = function($msg){
        echo "
     [x] " . $msg->body;
    };
    
    //消费,关闭ack
    $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
    
    while(count($channel->callbacks)) {
        $channel->wait();
    }
    
    $channel->close();
    $connection->close();

    测试


    开启一个终端作为生产者P,两个消费者作为消费者C1,C2。

    生产者生产消息,会发现每个消费者都会收到同样的消息。很简单,不上图。

    以上。

  • 相关阅读:
    【团队作业冲刺'十日谈'】第七天——端侧部署6、记录保存
    团队冲刺第六天端侧部署5,模型下载功能2
    团队冲刺第五天端侧部署4,模型下载
    冲刺第四天 端侧部署3,登陆页面2
    冲刺第三天 端侧部署2,登录功能
    冲刺第二天模型训练2+端侧部署
    每日总结4.27
    每日总结4.26
    Jenkins基于https的k8s配置
    快速搭建私有gitlab
  • 原文地址:https://www.cnblogs.com/hejun695/p/7700089.html
Copyright © 2011-2022 走看看