zoukankan      html  css  js  c++  java
  • (1)RabbitMQ简介与安装

    1.RabbitMQ简介

    因为RabbitMQ是基于开源的AMQP协议来实现的,所以在了解MQ时候,首先我们来了解下AMQP协议。AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端或者中间件不同产品、不同的开发语言等条件的限制,也就是说消息生产者无需知道消费者如何处理消息结果,反之亦然,解耦了组件跟组件依赖。RabbitMQ服务器端用Erlang语言编写,同时也支持多种客户端来开发跨语言消息传递,例如:Java,.NET,PHP,Python,JavaScript,Ruby,Go等。RabbitMQ还支持多种消息传递协议、消息排队、传递确认、队列的灵活路由、多种交换类型。还支持分布式集群以实现高可用性和吞吐量。适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景。还可以通过HTTP-API命令行工具和用于管理和监视RabbitMQ的UI。

    2.RabbitMQ在CentOS 7安装

    因为我对Linux运维知识面比较薄弱,所以在Linux上部署RabbitMQ这块暂时不想耗太多时间在这上面去(后续有时间再深入了解),这里我完全是跟着园区Net大神晓晨大佬这篇文章(https://www.cnblogs.com/stulzq/p/7551819.html)去部署的。网上也有很多RabbitMQ在Linux部署文章参考,大家也可以自行度娘。

    //安装服务端erlang语言
    rpm -Uvh https://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm
    //安装socat
    yum install socat
    //安装服务端RabbitMQ
    rpm -Uvh https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.12/rabbitmq-server-3.6.12-1.el7.noarch.rpm

    3.RabbitMQ服务端常用命令

    //启用Web管理平台
    rabbitmq-plugins enable rabbitmq_management
    //开启服务
    systemctl start rabbitmq-server.service
    //停止服务
    systemctl stop rabbitmq-server.service
    //查看服务状态
    systemctl status rabbitmq-server.service
    //查看RabbitMQ状态
    rabbitmqctl status
    //添加用户赋予管理员权限
    rabbitmqctl  add_user  tom  12345
    rabbitmqctl  set_user_tags  tom  administrator
    //查看用户列表
    rabbitmqctl list_users
    //删除用户
    rabbitmqctl delete_user username
    //修改用户密码
    rabbitmqctl oldPassword Username newPassword

    4.访问RabbitMQ Web管理平台

    当启用RabbitMQ Web管理平台,我们根据部署CentOS 7系统的IP在浏览器上打开http://IP:15672,如果新增了用户,一定要设置新增用户的VirtualHost的权限,不然客户端调用RabbitMQ时候会报错!具体处理方法如下截图:
    //未设置权限时


    点解设置权限即可。
    如果访问显示404,则是防火墙把通讯给过滤掉了,请执行命令把防火墙关闭掉再打开,以下我列出所有CentOS 7关于防火墙命令:

    //查看防火状态
    systemctl status firewalld
    //暂时关闭防火墙
    systemctl stop firewalld
    //永久关闭防火墙
    systemctl disable firewalld
    //重启防火墙
    systemctl enable firewalld
    //永久关闭后重启
    chkconfig iptables on 

    关闭防火墙之后,在浏览器上就会看到下面管理平台界面:

    5.NET Core使用RabbitMQ

    通过nuget安装:https://www.nuget.org/packages/RabbitMQ.Client/

    5.1定义生产者

    class Program
    {
        static void Main(string[] args)
        {
            string queueName = "DirectExchangeQueueName";
            string routeKey = "DirectExchangeQueueName";
            //创建连接工厂
            var factory = new ConnectionFactory
            {
                UserName = "dengwu",//用户名
                Password = "123456",//密码
                HostName = "192.168.112.133",//rabbitmq ip
            };
    
            //创建连接
            var connection = factory.CreateConnection();
            //创建通道
            var channel = connection.CreateModel();
            //声明一个队列
            channel.QueueDeclare(queueName, false, false, false, null);
    
            Console.WriteLine("
    RabbitMQ连接成功,请输入消息,输入exit退出!");
    
            string input;
            do
            {
                input = Console.ReadLine();
    
                var sendBytes = Encoding.UTF8.GetBytes(input);
                //发布消息
                channel.BasicPublish("", routeKey, null, sendBytes);
    
            } while (input.Trim().ToLower() != "exit");
            channel.Close();
            connection.Close();
        }
    }

    5.2定义消费者

    class Program
    {
        static void Main(string[] args)
        {
            string queueName = "DirectExchangeQueueName";
            //创建连接工厂
            var factory = new ConnectionFactory
            {
                UserName = "dengwu",//用户名
                Password = "123456",//密码
                HostName = "192.168.112.133",//rabbitmq ip
            };
    
            //创建连接
            var connection = factory.CreateConnection();
            //创建通道
            var channel = connection.CreateModel();
            //事件基本消费者
            var consumer = new EventingBasicConsumer(channel);
    
            //接收到消息事件
            consumer.Received += (ch, ea) =>
            {
                var boby = ea.Body;
                var message = Encoding.UTF8.GetString(boby.ToArray());
    
                Console.WriteLine($"收到消息: {message}");
                //确认该消息已被消费
                channel.BasicAck(ea.DeliveryTag, false);
                //Console.WriteLine($"收到该消息[{ea.DeliveryTag}] 延迟10s发送回执");
                //Thread.Sleep(10000);
                //Console.WriteLine($"已发送回执[{ea.DeliveryTag}]");
            };
            //启动消费者 设置为手动应答消息
            channel.BasicConsume(queueName, false, consumer);
            Console.WriteLine("消费者已启动");
            Console.ReadKey();
            channel.Dispose();
            connection.Close();
        }
    }

    运行:

    通过启动一个生产者,一个消费者,我们可以看到,生产者通过RabbitMQ决定投递消息给对应消费者。

    5.3RabbitMQ消费失败的处理

    RabbitMQ采用消息应答机制,即消费者收到一个消息之后,需要发送一个应答,然后RabbitMQ才会将这个消息从队列中删除,如果消费者在消费过程中出现异常,断开连接没有发送应答,那么RabbitMQ会将这个消息重新投递,下面我们将消费者接收到消息事件代码修改如下:

    //接收到消息事件
    consumer.Received += (ch, ea) =>
    {
        var boby = ea.Body;
        var message = Encoding.UTF8.GetString(boby.ToArray());
    
        Console.WriteLine($"收到该消息[{ea.DeliveryTag}] 延迟10s发送回执");
        Thread.Sleep(10000);
        Console.WriteLine($"已发送回执[{ea.DeliveryTag}]");
    };

    先在生产者里面预先传递三个消息:


    如果我们设置了消息应答延迟10s,如果在这10s中,该消费者断开了连接,那么消息会被RabbitMQ重新投递的。具体大家可以自行测试。

    6.总结

    该章节主要简单介绍RabbitMQ概念在Linux上简单部署,接下来章节,我会陆续介绍AMQP Messaging中的基本概念跟Exchange(交换机)。

    参考文献:
    RabbitMQ官网
    .NET Core 使用RabbitMQ

  • 相关阅读:
    android 多线程
    android调用 .net webService
    android apk程序升级
    android连数据库
    android事件
    android 服务
    android 活动
    (12)android控件-Advanced
    (11)android控件-Transitions
    (10) android控件-date
  • 原文地址:https://www.cnblogs.com/wzk153/p/13219273.html
Copyright © 2011-2022 走看看