zoukankan      html  css  js  c++  java
  • 在C#中使用消息队列RabbitMQ

    1、什么是RabbitMQ。详见 http://www.rabbitmq.com/

        作用就是提高系统的并发性,将一些不需要及时响应客户端且占用较多资源的操作,放入队列,再由另外一个线程,去异步处理这些队列,可极大的提高系统的并发能力。

    2、安装

        RabbitMQ服务:http://www.rabbitmq.com/download.html
        (安装完RabbitMQ服务后,会在Windows服务中看到。如果没有Erlang运行环境,在安装过程中会提醒先安装Erlang环境。http://www.erlang.org/downloads)

        .net客户端类库:http://www.rabbitmq.com/dotnet.html

    下载Erlang环境并安装

    地址:http://www.erlang.org/downloads

    然后安装RabbitMQ,安装成功后会在服务中看到该服务。

    3、插件

         RabbitMQ提供了很多好用的插件,最常用的就是web管理工具,启动此插件。

         CMD中运行命令:rabbitmq-plugins enable rabbitmq_management

         注:rabbitmq-plugins 所在路径为:D:Program FilesRabbitMQ Server abbitmq_server-3.4.0sbin

         web管理工具的地址是:http://localhost:15672,初始用户名:guest 初始密码:guest

    4、配置

        配置文件地址为:C:Documents and SettingsAdministratorApplication DataRabbitMQ abbitmq.config,默认没有rabbit.config文件,需要手工新建(默认会有rabbitmq.config.example 作为参考)。基于安全,做了两个配置,如下:

        

    复制代码
    [
    {rabbit,
    [
    {loopback_users, [<<"guest">>]},
    {tcp_listeners, [{"127.0.0.1", 1234},
    {"10.121.1.48", 8009}]}
    
    ]}
    ].
    复制代码

    loopback_users:设置只能在与RabbitMq服务同一台机器上访问服务的用户。

    tcp_listeners:设置RabbitMQ监听的IP地址与端口。只监听局域网内网iP、修改默认端口,防止被入侵攻击。

    设置完后,别忘记了以下操作,否则配置不起作用。

    • 停止RabbitMQ服务;
    • 重新安装服务使配置生效:rabbitmq-service.bat install

            此命令要切换到路径:D:Program FilesRabbitMQ Server abbitmq_server-3.4.0sbin

    • 启动RabbitMQ服务;

    5、Demo练习。

    复制代码
    消息生产者:
    
    class Program
        {
            static void Main(string[] args)
            {
                try
                {
                    ConnectionFactory factory = new ConnectionFactory();
                    factory.HostName = Constants.MqHost;
                    factory.Port = Constants.MqPort;
                    factory.UserName = Constants.MqUserName;
                    factory.Password = Constants.MqPwd;
                    using (IConnection conn = factory.CreateConnection())
                    {
                        using (IModel channel = conn.CreateModel())
                        {
                            //在MQ上定义一个持久化队列,如果名称相同不会重复创建
                            channel.QueueDeclare("MyFirstQueue", true, false, false, null);
                            while (true)
                            {
                                string customStr = Console.ReadLine();
                                RequestMsg requestMsg = new RequestMsg();
                                requestMsg.Name = string.Format("Name_{0}", customStr);
                                requestMsg.Code = string.Format("Code_{0}", customStr);
                                string jsonStr = JsonConvert.SerializeObject(requestMsg);
                                byte[] bytes = Encoding.UTF8.GetBytes(jsonStr);
                                
                                //设置消息持久化
                                IBasicProperties properties = channel.CreateBasicProperties();
                                properties.DeliveryMode = 2;
                                channel.BasicPublish("", "MyFirstQueue", properties, bytes);
    
                                //channel.BasicPublish("", "MyFirstQueue", null, bytes);
    
                                Console.WriteLine("消息已发送:" + requestMsg.ToString());
                            }
                        }
                    }
                }
                catch (Exception e1)
                {
                    Console.WriteLine(e1.ToString());
                }
                Console.ReadLine();
            }
        }
    复制代码
    复制代码
    class Program
        {
            static void Main(string[] args)
            {
                try
                {
                    ConnectionFactory factory = new ConnectionFactory();
                    factory.HostName = Constants.MqHost;
                    factory.Port = Constants.MqPort;
                    factory.UserName = Constants.MqUserName;
                    factory.Password = Constants.MqPwd;
                    using (IConnection conn = factory.CreateConnection())
                    {
                        using (IModel channel = conn.CreateModel())
                        {
                            //在MQ上定义一个持久化队列,如果名称相同不会重复创建
                            channel.QueueDeclare("MyFirstQueue", true, false, false, null);
    
                            //输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息
                            channel.BasicQos(0, 1, false);
                            
                            Console.WriteLine("Listening...");
    
                            //在队列上定义一个消费者
                            QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
                            //消费队列,并设置应答模式为程序主动应答
                            channel.BasicConsume("MyFirstQueue", false, consumer);
    
                            while (true)
                            {
                                //阻塞函数,获取队列中的消息
                                BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                                byte[] bytes = ea.Body;
                                string str = Encoding.UTF8.GetString(bytes);
                                RequestMsg msg = JsonConvert.DeserializeObject<RequestMsg>(str);
                                Console.WriteLine("HandleMsg:" + msg.ToString());
                                //回复确认
                                channel.BasicAck(ea.DeliveryTag, false);
                            }
                        }
                    }
                }
                catch (Exception e1)
                {
                    Console.WriteLine(e1.ToString());
                }
                Console.ReadLine();
            }
        }
    复制代码

  • 相关阅读:
    ZOJ 2158 Truck History
    Knight Moves (zoj 1091 poj2243)BFS
    poj 1270 Following Orders
    poj 2935 Basic Wall Maze (BFS)
    Holedox Moving (zoj 1361 poj 1324)bfs
    ZOJ 1083 Frame Stacking
    zoj 2193 Window Pains
    hdu1412{A} + {B}
    hdu2031进制转换
    openjudge最长单词
  • 原文地址:https://www.cnblogs.com/Alex80/p/10847768.html
Copyright © 2011-2022 走看看