zoukankan      html  css  js  c++  java
  • RabbitMQ

    #region RabbitMQ
                var factory = new ConnectionFactory(); //实例化工厂
                factory.HostName = "localhost";
                factory.UserName = "sa";
                factory.Password = "123321";
                string queue = "Test";
                using (var connection = factory.CreateConnection()) //创建链接
                {
                    using (var channel = connection.CreateModel())  //创建通道
                    {
                        channel.QueueDeclare(
                            queue: queue, //消息队列名称
                            durable: false,//消息队列是否持久化
                            exclusive: false, //消息队列是否被本次连接connection独享。(本次连接connection创建的信道可以共用).排外的queue在当前连接被断开的时候会自动消失(清除)无论是否设置了持久化.
                            autoDelete: false, //消息队列是否自动删除。也就是说queue会清理自己,但是是在最后一个connection断开的时候。
                            arguments: null  //参数
                            ); //创建一个队列
                        channel.BasicQos(0, 1, false); // Qos即服务质量
                        for (int i = 0; i < 10; i++)
                        {
                            string message = i.ToString();
                            var body = Encoding.UTF8.GetBytes(message); //转换为字节序列
                            channel.BasicPublish("", queue, null, body);//注意路由键在用direct交换器时,要指定为队列名
                            Console.WriteLine("set {0}", message);
                        }
                    }
                }
                Console.ReadLine();
                #endregion
    添加生产者
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace ConsoleApplication2
    {
        class Program
        {
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory()
                {
                    HostName = "localhost",
                    UserName = "sa",
                    Password = "123321"
                };
                string queue = "Test";
    
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.QueueDeclare(queue, false, false, false, null);//申明队列
                        var consumer = new EventingBasicConsumer(channel);//申明事件型消费者
    
                        Console.WriteLine("ConsoleApplication2 waiting for message.");
                        channel.BasicConsume(queue, false, consumer);//定义该消费者是否在该队列上为自动应答的
                        channel.BasicQos(0, 1, false); // Qos即服务质量
                        consumer.Received += (sender, ea) =>
                        {
                            int s = GetRandom();
                            Thread.Sleep(s);
                            string message = Encoding.UTF8.GetString(ea.Body);
                            channel.BasicAck(ea.DeliveryTag, false);
                            Console.WriteLine("Received {0} 耗费 {1} 毫秒", message, s);
                        };
                        Console.ReadLine();
                    }
                }
            }
            private static int GetRandom()
            {
                Guid guid = Guid.NewGuid();
                Random rd = new Random(guid.GetHashCode());
                return rd.Next(100, 10000);
            }
        }
    }
    添加消费者1
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace ConsoleApplication3
    {
        class Program
        {
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory()
                {
                    HostName = "localhost",
                    UserName = "sa",
                    Password = "123321"
                };
                string queue = "Test";
    
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.QueueDeclare(queue, false, false, false, null);
                        var consumer = new QueueingBasicConsumer(channel);
                        Console.WriteLine("ConsoleApplication3 waiting for message.");
                        channel.BasicConsume(queue, false, consumer);
                        channel.BasicQos(0, 1, false); // Qos即服务质量
                        while (true)
                        {
                            int s = GetRandom();
                            Thread.Sleep(s);
                            var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                            var message = Encoding.UTF8.GetString(ea.Body);
                            Console.WriteLine("Received {0} 耗费 {1} 毫秒", message, s);
                            channel.BasicAck(ea.DeliveryTag, false);
                        }
                    }
                }
            }
            private static int GetRandom()
            {
                Guid guid = Guid.NewGuid();
                Random rd = new Random(guid.GetHashCode());
                return rd.Next(1000, 10000);
            }
        }
    }
    添加消费者2
    #region RabbitMQ direct
    //try
    //{
    //    var factory = new ConnectionFactory(); //实例化工厂
    //    factory.HostName = "localhost";
    //    factory.UserName = "guest";
    //    factory.Password = "guest";
    //    string ExchangeName = "Exchange1";
    //    using (var connection = factory.CreateConnection()) //创建链接
    //    {
    //        using (var channel = connection.CreateModel())  //创建通道
    //        {
    //            //channel.ExchangeDeclare(ExchangeName, "direct", false, false, null);
    //            //channel.QueueDeclare(
    //            //    queue: queueName, //消息队列名称
    //            //    durable: false,//消息队列是否持久化
    //            //    exclusive: false, //消息队列是否被本次连接connection独享。(本次连接connection创建的信道可以共用).排外的queue在当前连接被断开的时候会自动消失(清除)无论是否设置了持久化.
    //            //    autoDelete: false, //消息队列是否自动删除。也就是说queue会清理自己,但是是在最后一个connection断开的时候。
    //            //    arguments: null  //参数
    //            //    ); //创建一个队列
    //            ////进行绑定
    //            //channel.QueueBind(ExchangeName, queueName, routingKey: queueName);
    //            ////channel.BasicQos(0, 1, false); // Qos即服务质量
    //            channel.ExchangeDelete(ExchangeName);
    //            //channel.QueueDelete(queueName);
    //            channel.ExchangeDeclare(exchange: ExchangeName, type: "direct", durable: true, autoDelete: false, arguments: null);
    
    //            string queueName1 = "Test1";
    //            channel.QueueDeclare(queueName1, durable: true, autoDelete: false, exclusive: false, arguments: null);
    //            channel.QueueBind(queueName1, ExchangeName, routingKey: queueName1);
    //            for (int i = 0; i < 10; i++)
    //            {
    //                string message = i.ToString();
    //                var body = Encoding.UTF8.GetBytes(message); //转换为字节序列
    //                channel.BasicPublish(
    //                    exchange: ExchangeName,
    //                    routingKey: queueName1,
    //                    basicProperties: null, //消息持久化, DeliveryMode=2 (Non-persistent (1) or persistent (2))
    //                    body: body);//注意路由键在用direct交换器时,要指定为队列名
    //                Console.WriteLine("set {0}", message);
    //            }
    
    //            string queueName2 = "Test2";
    //            channel.QueueDeclare(queueName2, durable: true, autoDelete: false, exclusive: false, arguments: null);
    //            channel.QueueBind(queueName2, ExchangeName, routingKey: queueName2);
    //            for (int i = 0; i < 10; i++)
    //            {
    //                string message = i.ToString();
    //                var body = Encoding.UTF8.GetBytes(message); //转换为字节序列
    //                channel.BasicPublish(
    //                    exchange: ExchangeName,
    //                    routingKey: queueName2,
    //                    basicProperties: null, //消息持久化, DeliveryMode=2 (Non-persistent (1) or persistent (2))
    //                    body: body);//注意路由键在用direct交换器时,要指定为队列名
    //                Console.WriteLine("set {0}", message);
    //            }
    //        }
    //    }
    //    Console.ReadLine();
    //}
    //catch (Exception)
    //{
    
    //    throw;
    //}
    #endregion
    
    #region RabbitMQ Fanout
    //try
    //{
    //    var factory = new ConnectionFactory(); //实例化工厂
    //    factory.HostName = "localhost";
    //    factory.UserName = "guest";
    //    factory.Password = "guest";
    //    string ExchangeName = "Exchange1";
    //    using (var connection = factory.CreateConnection()) //创建链接
    //    {
    //        using (var channel = connection.CreateModel())  //创建通道
    //        {
    //            channel.ExchangeDelete(ExchangeName);
    //            //channel.QueueDelete(queueName);
    //            string queueName1 = "Test1";
    
    //            channel.ExchangeDeclare(exchange: ExchangeName, type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);
    //            channel.QueueDeclare(queueName1, durable: true, autoDelete: false, exclusive: false, arguments: null);
    //            channel.QueueBind(queueName1, ExchangeName, routingKey: queueName1);
    //            for (int i = 0; i < 10; i++)
    //            {
    //                string message = i.ToString();
    //                var body = Encoding.UTF8.GetBytes(message); //转换为字节序列
    //                channel.BasicPublish(
    //                    exchange: ExchangeName,
    //                    routingKey: queueName1,
    //                    basicProperties: null, //消息持久化, DeliveryMode=2 (Non-persistent (1) or persistent (2))
    //                    body: body);//注意路由键在用direct交换器时,要指定为队列名
    //                Console.WriteLine("set {0}", message);
    //            } 
    
    //            //string queueName2 = "Test2";
    //            //channel.QueueDeclare(queueName2, durable: true, autoDelete: false, exclusive: false, arguments: null);
    //            //channel.QueueBind(queueName2, ExchangeName, routingKey: queueName2);
    //            //for (int i = 0; i < 10; i++)
    //            //{
    //            //    string message = i.ToString();
    //            //    var body = Encoding.UTF8.GetBytes(message); //转换为字节序列
    //            //    channel.BasicPublish(
    //            //        exchange: ExchangeName,
    //            //        routingKey: queueName2,
    //            //        basicProperties: null, //消息持久化, DeliveryMode=2 (Non-persistent (1) or persistent (2))
    //            //        body: body);//注意路由键在用direct交换器时,要指定为队列名
    //            //    Console.WriteLine("set {0}", message);
    //            //}
    //        }
    //    }
    //    Console.ReadLine();
    //}
    //catch (Exception)
    //{
    
    //    throw;
    //}
    #endregion
    
    #region RabbitMQ Topic
    try
    {
        var factory = new ConnectionFactory(); //实例化工厂
        factory.HostName = "localhost";
        factory.UserName = "guest";
        factory.Password = "guest";
    
        string ExchangeName = "Exchange1";
        string queueName1 = "Test1";
        string queueName2 = "Test2";
        string queueName3 = "Test3";
        using (var connection = factory.CreateConnection()) //创建链接
        {
            using (var channel = connection.CreateModel())  //创建通道
            {
                //channel.ExchangeDelete(ExchangeName);
                //channel.QueueDelete(queueName1);
                //channel.QueueDelete(queueName2);
                //channel.QueueDelete(queueName3);
                IBasicProperties properties = channel.CreateBasicProperties(); //消息持久化
                properties.Persistent = true;
    
    
                channel.ExchangeDeclare(exchange: ExchangeName, type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null);
                channel.QueueDeclare(queueName1, durable: true, autoDelete: false, exclusive: false, arguments: null);
                channel.QueueBind(queueName1, ExchangeName, routingKey: queueName1);
                for (int i = 0; i < 10; i++)
                {
                    string message = queueName1 + i.ToString();
                    var body = Encoding.UTF8.GetBytes(message); //转换为字节序列
                    channel.BasicPublish(
                        exchange: ExchangeName,
                        routingKey: queueName1,
                        basicProperties: properties, //消息持久化, DeliveryMode=2 (Non-persistent (1) or persistent (2))
                        body: body);//注意路由键在用direct交换器时,要指定为队列名
                    Console.WriteLine("set {0}", message);
                }
    
                channel.QueueDeclare(queueName2, durable: true, autoDelete: false, exclusive: false, arguments: null);
                channel.QueueBind(queueName2, ExchangeName, routingKey: queueName2);
                for (int i = 0; i < 10; i++)
                {
                    string message = queueName2 + i.ToString();
                    var body = Encoding.UTF8.GetBytes(message); //转换为字节序列
                    channel.BasicPublish(
                        exchange: ExchangeName,
                        routingKey: queueName2,
                        basicProperties: properties, //消息持久化, DeliveryMode=2 (Non-persistent (1) or persistent (2))
                        body: body);//注意路由键在用direct交换器时,要指定为队列名
                    Console.WriteLine("set {0}", message);
                }
    
                channel.QueueDeclare(queueName3, durable: true, autoDelete: false, exclusive: false, arguments: null);
                channel.QueueBind(queueName3, ExchangeName, routingKey: queueName3);
                for (int i = 0; i < 10; i++)
                {
                    string message = queueName3 + i.ToString();
                    var body = Encoding.UTF8.GetBytes(message); //转换为字节序列
                    channel.BasicPublish(
                        exchange: ExchangeName,
                        routingKey: queueName3,
                        basicProperties: properties, //消息持久化, DeliveryMode=2 (Non-persistent (1) or persistent (2))
                        body: body);//注意路由键在用direct交换器时,要指定为队列名
                    Console.WriteLine("set {0}", message);
                }
            }
        }
        Console.ReadLine();
    }
    catch (Exception)
    {
    
        throw;
    }
    #endregion
    direct、Fanout、Topic 三种交换机
  • 相关阅读:
    Oracle将放弃prometric
    07年博客迁移:回记Oracle的三天培训
    iptv速率实测
    Oracle database 11g r2最新安装体验
    Oracle中dblink所产生远程会话的一些表现
    07年博客迁移:Home desktop migrate to fedora
    Mysql:语法:字符集、排序规则
    Mysql:事务管理——未完待续
    Mysql:SQL语句:用户、权限、信息、状态、设置、复制、会话、prepare sql 等
    Mysql:函数、操作符
  • 原文地址:https://www.cnblogs.com/Jacob-Wu/p/10267771.html
Copyright © 2011-2022 走看看