zoukankan      html  css  js  c++  java
  • RabbitMQ引入

    引入MQ话题

      可能很多人有疑惑:MQ到底是什么?哪些场景下要使用MQ?
    前段时间安装了RabbitMQ,现在就记录下自己的学习心得吧。首先看段程序:

    class Program
        {
            static void Main(string[] args)
            {
                new Thread(Write).Start();
                new Thread(Write).Start();
                new Thread(Write).Start();
                new Thread(Write).Start();
            }
            
            public static void WriteLog(int i)
            {
                using (FileStream f = new FileStream(@"d:\test.txt", FileMode.Append))
                {
                    using (StreamWriter sw = new StreamWriter(f, Encoding.Default))
                    {
                        sw.Write(i);
                    }
                }
            }
    
            public static void Write()
            {
                for (int i = 0; i < 10000; i++)
                {
                    WriteLog(i);
                }
            }
    }
    View Code

    仅仅从代码上看,没有觉得任何问题对吧?编译也是通过的,但是执行时,出现一个问题:

     

    当然,这仅仅是一个小的案例,类似这种多线程写文件造成的问题, 就应该使用MQ了

    MQ的使用场景大概包括解耦,提高峰值处理能力,送达和排序保证,缓冲等

    MQ概述

      消息队列技术是分布式应用间交换信息的一种技术。

      消息队列可驻留在内存或磁盘上,队列存储消息直到它们被应用程序读走。

      通过消息队列,应用程序可独立地执行--它们不需要知道彼此的位置、或在继续执行前不需要等待接收程序接收此消息。

      MQ主要作用是接受和转发消息。你可以想想在生活中的一种场景:当你把信件的投进邮筒,邮递员肯定最终会将信件送给收件人。我们可以把MQ比作 邮局和邮递员。

      MQ和邮局的主要区别是,它不处理消息,但是,它会接受数据、存储消息数据、转发消息。

    RabbitMQ术语

    生产者消息发送者,在MQ中被称为生产者(producer),一个发送消息的应用也被叫做生产者,用P表示

    消费者:生产者“生产”出消息后,最终由谁消费呢?等待接受消息的应用程序,我们称之为消费者(Consuming ),用C表示

    队列:消息只能存储在队列(queue )中。尽管消息在rabbitMQ和应用程序间流通,但是队列却是存在于RabbitMQ内部。

    一个队列不受任何限制,它可以存储你想要存储的消息量,它本质上是一个无限的缓冲区。多个生产者可以向同一个队列发送消息,多个消费者可以尝试从同一个消息队列中接收数据

    一个队列像下面这样(上面是它的队列名称)

     

    注意:生产者、消费者、中间件不必在一台机器上,实际应用中也是绝大多数不在一起的。我们可以用一张图表示RabbitMQ的构造:

     

    使用RabbitMQ解决多线程写入文件问题

      多线程写入,产生消息的也就是一个程序(一个生产者P),消费消息的也是一个程序,它的模型应该是:

     

    编写代码

      引入RabbitMQ client DLL     程序包管理nuget:PM> Install-Package RabbitMQ.Client

    生产者

      首先,创建一个 connection 通过socket连接 去和服务器连接起来(需要传目的服务器的IP、用户名、密码等)。

      接着 创建一个 channel ,这是大部分的要做的事情所在。要发送消息,我们必须声明一个队列,然后我们可以向队列发布消息。执行一次BasicPublish方法,推送一个消息。

    class Program
        {
            static void Main(string[] args)
            {
                new Thread(Write).Start();
                new Thread(Write).Start();
                new Thread(Write).Start();
            }
    
            public static void Write()
            {
                var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", };
                using (var connection = factory.CreateConnection())
                using (var channel = connection.CreateModel())
                {
    // QueueDeclare参数含义: https://blog.csdn.net/vbirdbest/article/details/78670550
                    channel.QueueDeclare(queue: "writeLog", durable: false, exclusive: false, autoDelete: false, arguments: null);
                    for (int i = 0; i < 8000; i++)
                    {
                        string message = i.ToString();
                        var body = Encoding.UTF8.GetBytes(message);
    
                        channel.BasicPublish(exchange: "", routingKey: "writeLog", basicProperties: null, body: body);
                        Console.WriteLine("Program Sent {0}", message);
                    }
                }
            }
        }
    View Code

      声明的队列,在服务器中如果不存在了,会自动创建。而消息的内容是字节数组,在使用时,注意编码问题。

    消费者

      当队列里有消息时,消费者能够从队列里获取消息

      就像我们打篮球进行传球,需要事先确认要传给的那个队友位置一样,生产者要发送消息,一定要事先知道消费消息的程序的对列是哪个。

    由此,声明对列,就应该在消费者程序中完成。

    class Program
        {
            public static void Main()
            {
                var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", VirtualHost ="/"};
                using (var connection = factory.CreateConnection())
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "writeLog",
                                         durable: false,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);
    //提供一个回调事件EventingBasicConsumer,用于处理接收到的消息。
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        ExcuateWriteFile(message);
                        Console.WriteLine(" Receiver Received {0}", message);
                    };
    //启动一个基本的内容类消费者
                    channel.BasicConsume(queue: "writeLog",
                                         noAck: true,
                                         consumer: consumer);
    
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }
            public static void ExcuateWriteFile(string i)
            {
                using (FileStream fs = new FileStream(@"d:\test.txt", FileMode.Append))
                {
                    using (StreamWriter sw = new StreamWriter(fs, Encoding.Unicode))
                    {
                        sw.Write(i);
                    }
                }
            }
    }
    View Code

    执行程序

    先执行 消费者程序,让它一直保持监听。

    1、未启动消费者前

    2、消费者启动后,可以看到界面处

     

    然后运行 生产者程序。我们先开着 Receive ,当生产者运行时

     消费者自动触发执行 :

    直到所有的 指定的 queue 里面的消息完全消费完为止。(此时消费者程序仍然在监听中)

    异常

    1、None of the specified endpoints were reachable

      参考:https://www.cnblogs.com/gossip/p/4573056.html

    这个异常在创建连接时抛出(CreateConnection()),原因一般是ConnectionFactory参数设置不对,比如HostName、UserName、Password

    标准设置:

    var factory = new ConnectionFactory();
    
    factory.UserName = QueueSetttiong.UserName;   //用户名
    
    factory.Password = QueueSetttiong.Password;      //密码
    
    factory.HostName = QueueSetttiong.HostName;  //Rabbitmq服务IP,不包含端口
    
    factory.Port = AmqpTcpEndpoint.UseDefaultPort;
    
    factory.VirtualHost = QueueSetttiong.VirtualHost;  //默认为"/"
    
    factory.Protocol = Protocols.DefaultProtocol;
    View Code

    部署生产后,factory配置都ok,但是还是抛异常None of the specified endpoints were reachable,最后发现原因是生产机器防火墙未打开RabbitMQ的端口,RabbitMQ的默认端口是:5672

    另外一个可能的原因:未设置VirtualHost的权限

    设置方法:RabbitmqWeb管理网站-->Admin。未设置权限时:

     

    设置权限:(点击admin进入设置页面)

     

     

  • 相关阅读:
    java技术用ssh从linux服务器下载数据
    linux 常见操作命令
    IP地址归属地查询
    【转】Java检测字符串是否有乱码
    maven install 跳过test方法
    echarts实现动态传入数据刷新【可执行】
    echarts报错Cannot read property 'features' of undefined
    【java web】Caused by: java.lang.ClassNotFoundException: org.apache.commons.logging.LogFactory
    程序员,我们都是夜归人【转】
    程序员你为什么这么忙?【转】
  • 原文地址:https://www.cnblogs.com/peterYong/p/10176007.html
Copyright © 2011-2022 走看看