zoukankan      html  css  js  c++  java
  • .NET Core 使用RabbitMQ(转)

     

    原文地址:https://www.cnblogs.com/stulzq/p/7551819.html

    RabbitMQ简介

    AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

    AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
    RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    RabbitMQ提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署。适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景。

    RabbitMQ安装

    RabbitMQ安装,网上已经有许多教程了,这里简单介绍一下在CentOS下安装RabbitMQ。使用的版本为3.6.12最新版。

    1.首先安装erlang

    rpm -Uvh https://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm

    2.然后安装socat

    yum install socat

    3.最后安装RabbitMQ

    rpm -Uvh https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.12/rabbitmq-server-3.6.12-1.el7.noarch.rpm

    RabbitMQ常用命令

    启用Web控制台

    rabbitmq-plugins enable rabbitmq_management

    开启服务

    systemctl start rabbitmq-server.service

    停止服务

    systemctl stop rabbitmq-server.service

    查看服务状态

    systemctl status rabbitmq-server.service

    查看RabbitMQ状态

    rabbitmqctl status

    添加用户赋予管理员权限

    rabbitmqctl  add_user  username  password
    rabbitmqctl  set_user_tags  username  administrator
    

    查看用户列表

    rabbitmqctl list_users

    删除用户

    rabbitmqctl delete_user username

    修改用户密码

    rabbitmqctl oldPassword Username newPassword

    访问Web控制台

    http://服务器ip:15672/ 注意配置防火墙,默认用户名密码都是guest,若新建用户一定要记得配置权限。

    .NET Core 使用RabbitMQ

    通过nuget安装:https://www.nuget.org/packages/RabbitMQ.Client/

    定义生产者
    //创建连接工厂
    ConnectionFactory factory = new ConnectionFactory
    {
    	UserName = "admin",//用户名
    	Password = "admin",//密码
    	HostName = "192.168.157.130"//rabbitmq ip
    };
    
    //创建连接
    var connection = factory.CreateConnection();
    //创建通道
    var channel = connection.CreateModel();
    //声明一个队列
    channel.QueueDeclare("hello", false, false, false, null);
    
    Console.WriteLine("
    RabbitMQ连接成功,请输入消息,输入exit退出!");
    
    string input;
    do
    {
    	input = Console.ReadLine();
    
    	var sendBytes = Encoding.UTF8.GetBytes(input);
    	//发布消息
    	channel.BasicPublish("", "hello", null, sendBytes);
    
    } while (input.Trim().ToLower()!="exit");
    channel.Close();
    connection.Close();
    
    定义消费者
    			//创建连接工厂
    	        ConnectionFactory factory = new ConnectionFactory
    	        {
    		        UserName = "admin",//用户名
    		        Password = "admin",//密码
    		        HostName = "192.168.157.130"//rabbitmq ip
    	        };
    
    	        //创建连接
    	        var connection = factory.CreateConnection();
    	        //创建通道
    	        var channel = connection.CreateModel();
    
    			//事件基本消费者
    			EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
    
    			//接收到消息事件
    	        consumer.Received += (ch, ea) =>
    	        {
    		        var message = Encoding.UTF8.GetString(ea.Body);
    		        Console.WriteLine($"收到消息: {message}");
    				//确认该消息已被消费
    		        channel.BasicAck(ea.DeliveryTag, false);
    			};
    			//启动消费者 设置为手动应答消息
    			channel.BasicConsume("hello", false, consumer);
    	        Console.WriteLine("消费者已启动");
    	        Console.ReadKey();
    	        channel.Dispose();
    	        connection.Close();
    
    运行

    启动了一个生产者,两个消费者,可以看见两个消费者都能收到消息,消息投递到哪个消费者是由RabbitMQ决定的。

    RabbitMQ消费失败的处理

    RabbitMQ采用消息应答机制,即消费者收到一个消息之后,需要发送一个应答,然后RabbitMQ才会将这个消息从队列中删除,如果消费者在消费过程中出现异常,断开连接切没有发送应答,那么RabbitMQ会将这个消息重新投递。

    修改一下消费者的代码:

    //接收到消息事件
    consumer.Received += (ch, ea) =>
    {
    	var message = Encoding.UTF8.GetString(ea.Body);
    
    	Console.WriteLine($"收到消息: {message}");
    
    	Console.WriteLine($"收到该消息[{ea.DeliveryTag}] 延迟10s发送回执");
    	Thread.Sleep(10000);
    	//确认该消息已被消费
    	channel.BasicAck(ea.DeliveryTag, false);
    	Console.WriteLine($"已发送回执[{ea.DeliveryTag}]");
    };
    

    演示:

    从图中可以看出,设置了消息应答延迟10s,如果在这10s中,该消费者断开了连接,那么消息会被RabbitMQ重新投递。

    使用RabbitMQ的Exchange

    前面我们可以看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器),由Exchange将消息路由到一个或多个Queue中(或者丢弃)

    AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储。同理,消费者也是如此。Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中。

    RabbitMQ提供了四种Exchange模式:direct,fanout,topic,header 。但是 header模式在实际使用中较少,所以这里只介绍前三种模式。

    Exchange不是消费者关心的,所以消费者的代码完全不用变,用上面的消费者就行了。
    由于避免文章过长,影响阅读,所以只贴了部分代码,但是demo里面是完整可运行的,详细代码请查看demo。

    Direct Exchange

    所有发送到Direct Exchange的消息被转发到具有指定RouteKey的Queue。

    Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。

    //创建连接
    var connection = factory.CreateConnection();
    //创建通道
    var channel = connection.CreateModel();
    
    //定义一个Direct类型交换机
    channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);
    
    //定义一个队列
    channel.QueueDeclare(queueName, false, false, false, null);
    
    //将队列绑定到交换机
    channel.QueueBind(queueName, exchangeName, routeKey, null);
    

    运行:

    Fanout Exchange

    所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。

    Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。

    所以,Fanout Exchange 转发消息是最快的。

    为了演示效果,定义了两个队列,分别为hello1,hello2,每个队列都拥有一个消费者。

    static void Main(string[] args)
    {
    	string exchangeName = "TestFanoutChange";
    	string queueName1 = "hello1";
    	string queueName2 = "hello2";
    	string routeKey = "";
    
    	//创建连接工厂
    	ConnectionFactory factory = new ConnectionFactory
    	{
    		UserName = "admin",//用户名
    		Password = "admin",//密码
    		HostName = "192.168.157.130"//rabbitmq ip
    	};
    
    	//创建连接
    	var connection = factory.CreateConnection();
    	//创建通道
    	var channel = connection.CreateModel();
    
    	//定义一个Direct类型交换机
    	channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null);
    
    	//定义队列1
    	channel.QueueDeclare(queueName1, false, false, false, null);
    	//定义队列2
    	channel.QueueDeclare(queueName2, false, false, false, null);
    
    	//将队列绑定到交换机
    	channel.QueueBind(queueName1, exchangeName, routeKey, null);
    	channel.QueueBind(queueName2, exchangeName, routeKey, null);
    
    	//生成两个队列的消费者
    	ConsumerGenerator(queueName1);
    	ConsumerGenerator(queueName2);
    
    
    	Console.WriteLine($"
    RabbitMQ连接成功,
    
    请输入消息,输入exit退出!");
    
    	string input;
    	do
    	{
    		input = Console.ReadLine();
    
    		var sendBytes = Encoding.UTF8.GetBytes(input);
    		//发布消息
    		channel.BasicPublish(exchangeName, routeKey, null, sendBytes);
    
    	} while (input.Trim().ToLower() != "exit");
    	channel.Close();
    	connection.Close();
    }
    
    /// <summary>
    /// 根据队列名称生成消费者
    /// </summary>
    /// <param name="queueName"></param>
    static void ConsumerGenerator(string queueName)
    {
    	//创建连接工厂
    	ConnectionFactory factory = new ConnectionFactory
    	{
    		UserName = "admin",//用户名
    		Password = "admin",//密码
    		HostName = "192.168.157.130"//rabbitmq ip
    	};
    
    	//创建连接
    	var connection = factory.CreateConnection();
    	//创建通道
    	var channel = connection.CreateModel();
    
    	//事件基本消费者
    	EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
    
    	//接收到消息事件
    	consumer.Received += (ch, ea) =>
    	{
    		var message = Encoding.UTF8.GetString(ea.Body);
    
    		Console.WriteLine($"Queue:{queueName}收到消息: {message}");
    		//确认该消息已被消费
    		channel.BasicAck(ea.DeliveryTag, false);
    	};
    	//启动消费者 设置为手动应答消息
    	channel.BasicConsume(queueName, false, consumer);
    	Console.WriteLine($"Queue:{queueName},消费者已启动");
    }
    

    运行:

    Topic Exchange

    所有发送到Topic Exchange的消息被转发到能和Topic匹配的Queue上,

    Exchange 将路由进行模糊匹配。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“XiaoChen.#”能够匹配到“XiaoChen.pets.cat”,但是“XiaoChen.*” 只会匹配到“XiaoChen.money”。

    所以,Topic Exchange 使用非常灵活。

    string exchangeName = "TestTopicChange";
    string queueName = "hello";
    string routeKey = "TestRouteKey.*";
    
    //创建连接工厂
    ConnectionFactory factory = new ConnectionFactory
    {
    	UserName = "admin",//用户名
    	Password = "admin",//密码
    	HostName = "192.168.157.130"//rabbitmq ip
    };
    
    //创建连接
    var connection = factory.CreateConnection();
    //创建通道
    var channel = connection.CreateModel();
    
    //定义一个Direct类型交换机
    channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, false, false, null);
    
    //定义队列1
    channel.QueueDeclare(queueName, false, false, false, null);
    
    //将队列绑定到交换机
    channel.QueueBind(queueName, exchangeName, routeKey, null);
    
    
    
    Console.WriteLine($"
    RabbitMQ连接成功,
    
    请输入消息,输入exit退出!");
    
    string input;
    do
    {
    	input = Console.ReadLine();
    
    	var sendBytes = Encoding.UTF8.GetBytes(input);
    	//发布消息
    	channel.BasicPublish(exchangeName, "TestRouteKey.one", null, sendBytes);
    
    } while (input.Trim().ToLower() != "exit");
    channel.Close();
    connection.Close();
    

    运行

    Demo下载:DotNetCore.RabbitMQ

    最后:欢迎加入 .net core 交流群一起学习,群号:4656606 加入QQ群

    目前学习.NET Core 最好的教程 .NET Core 官方教程
    .NET Core 交流群:923036995 或 4656606  欢迎加群交流
    如果您认为这篇文章还不错或者有所收获,您可以点击右下角的【推荐】支持,或请我喝杯咖啡【赞赏】,这将是我继续写作,分享的最大动力!
    声明:原创博客请在转载时保留原文链接或者在文章开头加上本人博客地址,如发现错误,欢迎批评指正。凡是转载于本人的文章,不能设置打赏功能,如有特殊需求请与本人联系!
     
    好好学习,天天向上。
  • 相关阅读:
    mysql 时间函数
    Excel名称管理
    Unicode中文和特殊字符的编码范围
    带有历史数据置顶的id列表查询
    汉字表示范围
    ASP.NET模拟http进行GET/POST请求
    ASP.NET AES-128-CBC加密解密(与php通讯)
    dapper.net 获取分页存储过程返回的多结果集
    微信网页版抓包登录
    js添加/移除/阻止事件
  • 原文地址:https://www.cnblogs.com/Zhengxue/p/13336495.html
Copyright © 2011-2022 走看看