zoukankan      html  css  js  c++  java
  • rabbitmq系列——(6 消息队列集群)

       rabbitmq 两种集群模式,普通模式和镜像队列模式,推荐后者。

      普通模式介绍:

      重点在于 元数据 ,node1故障了消息实体就无法消费了;node1节点恢复后可消费;没有持久化就是产生消息丢失;

       

      镜像队列模式介绍: https://www.rabbitmq.com/ha.html

      每一个节点都是 队列结构+元数据;

      

      配置安装教程,请参考 https://www.linuxidc.com/linux/2016-10/136492.htm

    1. 生产者

    using RabbitMQMsgProducer.MessageProducer;
    using Microsoft.Extensions.Configuration;
    using System;
    using System.IO;
    using RabbitMQMsgProducer.ExchangeDemo;
    using RabbitMQMsgProducer.MessageConfirm;
    
    namespace RabbitMQMsgProducer
    {
        class Program
        {
            static void Main(string[] args)
            {
                try
                {
                    {
                        // 消息集群
                        ProducerClustersMsgConfirm.Send();
                    }
                    Console.ReadLine();
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
        }
    }
    using RabbitMQ.Client;
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading;
    
    namespace RabbitMQMsgProducer.MessageConfirm
    {
        public class ProducerClustersMsgConfirm
        {
            public static void Send()
            {
                ConnectionFactory factory = new ConnectionFactory();
                factory.VirtualHost = "/";
                factory.Port = 5672; // rabbitmq 消息集群端口
                factory.UserName = "fletcher"; // 用户名
                factory.Password = "fletcher"; // 密码
                //factory.AutomaticRecoveryEnabled = true;  //如果connection挂掉是否重新连接 
                //factory.TopologyRecoveryEnabled = true;   //连接恢复后,连接的交换机,队列等是否一同恢复
                string queueName = "ClustersMsgQueue";
                string exchangeName = "ClustersMsgExchange";
                string routingKeyName = "ClustersMsgRoutingKey";
                using (IConnection connection = factory.CreateConnection(new List<string>(){
                    "192.168.1.110",
                    "192.168.1.120",
                }))
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        // 声明队列
                        channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                        // 声明交换机
                        channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                        // 绑定
                        channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKeyName, arguments: null);
    
                        Console.WriteLine("the producer is ready . GO !");
                        for (int i = 1; i <= 100; i++)
                        {
                            IBasicProperties basicProperties = channel.CreateBasicProperties();
                            basicProperties.Persistent = true;
                            string message = $"the message is : {i} .";
                            channel.BasicPublish(exchange: exchangeName, routingKey: routingKeyName, basicProperties: basicProperties, body: Encoding.UTF8.GetBytes(message));
    
                            Thread.Sleep(300);
                            Console.WriteLine($"the message is : {i} . is send .");
                        }
                        Console.Read();
                    }
                }
            }
        }
    }

    2. 消费者

    using RabbitMQMsgConsumer001.ExchangeDemo;
    using RabbitMQMsgConsumer001.MessageConfirm;
    using RabbitMQMsgConsumer001.MessageConsumer;
    using System;
    using System.Threading.Tasks;
    
    namespace RabbitMQMsgConsumer001
    {
        class Program
        {
            static void Main(string[] args)
            {
                try
                {
                    {
                        // 消息集群
                        ConsumerClustersMsgConfirm.Receive();
                    }
                    Console.ReadLine();
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
        }
    }
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading;
    
    namespace RabbitMQMsgConsumer001.MessageConfirm
    {
        public class ConsumerClustersMsgConfirm
        {
            public static void Receive()
            {
                ConnectionFactory factory = new ConnectionFactory();
                factory.VirtualHost = "/";
                factory.Port = 5672; // rabbitmq 消息集群端口
                factory.UserName = "fletcher"; // 用户名
                factory.Password = "fletcher"; // 密码
                //factory.AutomaticRecoveryEnabled = true;  //如果connection挂掉是否重新连接 
                //factory.TopologyRecoveryEnabled = true;   //连接恢复后,连接的交换机,队列等是否一同恢复
                string queueName = "ClustersMsgQueue";
                string exchangeName = "ClustersMsgExchange";
                string routingKeyName = "ClustersMsgRoutingKey";
                using (IConnection connection = factory.CreateConnection(new List<string>(){
                    "192.168.1.110",
                    "192.168.1.120",
                }))
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        // 声明队列
                        channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                        // 声明交换机
                        channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                        // 绑定
                        channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKeyName, arguments: null);
    
                        Console.WriteLine("the consumer is ready . GO !");
    
                        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                            //手动确认  消息正常消费  告诉Broker把当前这条消息删除掉
                            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                            Console.WriteLine($"received : {message}");
                            Thread.Sleep(300);
                        };
                        // autoAck: false  手动确认
                        channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
    
                        Console.Read();
                    }
                }
            }
        }
    }

     3. 结果

  • 相关阅读:
    C#写文本文件,如何换行(添加换行符)
    C#使用oledb操作excel文件的方法
    winform之combobox
    vs2010快捷键
    可以下载一些书籍代码的网站
    对php和java里面的static函数和static的一些理解
    10.4-CMake find 模块
    6.25-Git 技巧
    6.4-Git Command
    2.25-CMake Tutorial
  • 原文地址:https://www.cnblogs.com/Fletcher/p/14262381.html
Copyright © 2011-2022 走看看