zoukankan      html  css  js  c++  java
  • RabbitMQ学习笔记

    一、开发环境搭建

    1.安装Erlang环境

    下载地址:OTP 18.3 Windows 64-bit Binary File

    2.安装RabbitMQ服务端

    下载地址:Windows

    打开命令行工具,进入RabbitMQ目录下的sbin文件夹下,输入以下命令:

    以服务的形式安装RabbitMQ

    rabbitmq-service install

    启动RabbitMQ服务

    rabbitmq-service start

    RabbitMQ所在的路径不能存在空格,否则会出现莫名其妙的错误。

    3.下载RabbitMQ的客户端程序集。

    下载地址:rabbitmq-dotnet-client-3.6.1-dotnet-4.5.zip

    解压得到的RabbitMQ.Client.dll就是客户端的dll

    4.启用管理界面工具

    rabbitmq-plugins enable rabbitmq_management

    在浏览器中输入地址:http://localhost:15672/#/可以访问管理页面

     

    一、消息队列中消息的整体处理流程及名词解析

    RabbitMQ处理消息流程图

     二、Exchange三种常用的模式

    Fanout模式

        /// <summary>
        /// 生产者
        /// </summary>
        class Program
        {
            static void Main(string[] args)
            {
                var factor = new ConnectionFactory() { Uri = "amqp://admin:123456789@localhost:5672/Foo" };
                using (var connection = factor.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        var exchangeName = "exchange_Fanout";
                        channel.ExchangeDeclare(exchangeName, "fanout");
                        string message = "Hello,World";
                        channel.BasicPublish(exchangeName, "", null, Encoding.UTF8.GetBytes(message));
                        Console.WriteLine("消息{0}被发送", message);
                    }
                }
                Console.ReadKey();
            }
        }
        /// <summary>
        /// 消费端
        /// </summary>
        class Program
        {
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory() { Uri = "amqp://admin:123456789@localhost:5672/Foo" };
                var connection = factory.CreateConnection();
                using (var channel = connection.CreateModel())
                {
                    var queueName = channel.QueueDeclare().QueueName;
                    channel.ExchangeDeclare(exchange: "exchange_Fanout", type: "fanout");
                    channel.QueueBind(queueName, "exchange_Fanout", "");
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        Console.WriteLine("接受到消息:{0}", Encoding.UTF8.GetString(ea.Body));
                    };
                    channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer);
                    Console.ReadLine();//阻断主线程,否则channel将会被释放
                }
                Console.ReadKey();
            }
        }

    Direct模式 

        /// <summary>
        /// 生产者
        /// </summary>
        class Program
        {
            static string queueName = "queue_direct";
            static string exchangeName = "exchange_direct";
            static string routingKey = "Hello";
            static void Main(string[] args)
            {
                var factor = new ConnectionFactory() { Uri = "amqp://admin:123456789@localhost:5672/Foo" };
                using (var connection = factor.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        //声明持久化队列
                        channel.QueueDeclare(queueName, true, false, false, null);
                        //声明持久化转发器
                        channel.ExchangeDeclare(exchangeName, "direct", true, false, null);
                        //绑定转发器和队列
                        channel.QueueBind(queueName, exchangeName, routingKey, null);
                        var property = channel.CreateBasicProperties();
                        //消息持久化
                        property.DeliveryMode = 2;
                        string message = "Hello,World";
                        channel.BasicPublish(exchangeName, routingKey, property, Encoding.UTF8.GetBytes(message));
                        Console.WriteLine("消息{0}被发送", message);
                    }
                }
                Console.ReadKey();
            }
        }
        /// <summary>
        /// 消费端
        /// </summary>
        class Program
        {
            static string queueName = "queue_direct";
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory() { Uri = "amqp://admin:123456789@localhost:5672/Foo" };
                var connection = factory.CreateConnection();
                using (var channel = connection.CreateModel())
                {
                    //声明持久化队列
                    channel.QueueDeclare(queueName, true, false, false, null);
                    //设置最大服务转发消息数量
                    channel.BasicQos(0, 1, false);
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        Console.WriteLine("接受到消息:{0}", Encoding.UTF8.GetString(ea.Body));
                        //消息应答
                        channel.BasicAck(ea.DeliveryTag, false);
                    };
                    channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer);
                    Console.ReadLine();
                }
                Console.ReadKey();
            }
        }

    Topic模式 

        /// <summary>
        /// 生产者
        /// </summary>
        class Program
        {
            static string exchangeName = "exchange_topic";
            static string queueName = "queue_topic";
            static void Main(string[] args)
            {
                var factor = new ConnectionFactory() { Uri = "amqp://admin:123456789@localhost:5672/Foo" };
                using (var connection = factor.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchangeName, "topic", true, false, null);
                        channel.QueueDeclare(queueName, true, false, false, null);
                        channel.QueueBind(queueName, exchangeName, "Hello.Topic.*");
                        var property = channel.CreateBasicProperties();
                        //消息持久化
                        property.DeliveryMode = 2;
                        string message = "Hello,World";
                        channel.BasicPublish(exchangeName, "Hello.Topic.World", property, Encoding.UTF8.GetBytes(message));
                        Console.WriteLine("消息{0}被发送", message);
                    }
                }
                Console.ReadKey();
            }
        }
        /// <summary>
        /// 消费端
        /// </summary>
        class Program
        {
            static string queueName = "queue_topic";
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory() { Uri = "amqp://admin:123456789@localhost:5672/Foo" };
                var connection = factory.CreateConnection();
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queueName, true, false, false, null);
                    channel.BasicQos(0, 1, false);
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        Console.WriteLine("接受到消息:{0}", Encoding.UTF8.GetString(ea.Body));
                        channel.BasicAck(ea.DeliveryTag, false);
                    };
                    channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer);
                    Console.ReadLine();
                }
                Console.ReadKey();
            }
        }
  • 相关阅读:
    centos 网络开启
    ubuntu server 服务器部署(二) mysql 安装配置手记
    ubuntu server 服务器部署(一) jdk 安装配置手记
    网络管理
    磁盘配额
    创建raid0
    创建raid5
    逻辑卷快照
    LVM(逻辑卷)
    1.计算机基础
  • 原文地址:https://www.cnblogs.com/Jabben_Yi/p/5334340.html
Copyright © 2011-2022 走看看