zoukankan      html  css  js  c++  java
  • 安装 RabbitMQ C#使用-摘自网络(包括RabbitMQ的配置)

    1、什么是RabbitMQ。详见 http://www.rabbitmq.com/ 。

    作用就是提高系统的并发性,将一些不需要及时响应客户端且占用较多资源的操作,放入队列,再由另外一个线程,去异步处理这些队列,可极大的提高系统的并发能力。

    2、安装

    RabbitMQ服务: http://www.rabbitmq.com/download.html 。

    (安装完RabbitMQ服务后,会在Windows服务中看到。如果没有Erlang运行环境,在安装过程中会提醒先安装Erlang环境。)

    .net客户端类库: http://www.rabbitmq.com/dotnet.html

    3、插件

    RabbitMQ提供了很多好用的插件,最常用的就是web管理工具,启动此插件。

    CMD中运行命令:rabbitmq-plugins enable rabbitmq_management

    注:rabbitmq-plugins 所在路径为:D:Program FilesRabbitMQ Server abbitmq_server-3.4.0sbin

    修改plugins db log路径

    需要建立rabbitmq-env.conf:

    RABBITMQ_MNESIA_BASE=e:/rabbitmq/db
    RABBITMQ_LOG_BASE=e:/rabbitmq/log
    RABBITMQ_PLUGINS_DIR=e:/rabbitmq/plugins

    4、配置

    配置文件地址为:C:Documents and SettingsAdministratorApplication DataRabbitMQ abbitmq.config,默认没有rabbit.config文件,需要手工新建(默认会有rabbitmq.config.example 作为参考)。基于安全,做了两个配置,如下:

    [
    {rabbit,
    [
    {loopback_users, [<<"guest">>]},
    {tcp_listeners, [{"127.0.0.1", 1234},
    {"172.31.26.24", 8009}]},
    {vm_memory_high_watermark, 0.5},
    {disk_free_limit,1000000000}
    ]}
    ].

     
    • The web UI is located at: http://server-name:15672/
     

    loopback_users:设置只能在与RabbitMq服务同一台机器上访问服务的用户。

    tcp_listeners:设置RabbitMQ监听的IP地址与端口。只监听局域网内网iP、修改默认端口,防止被入侵攻击。

    设置完后,别忘记了以下操作,否则配置不起作用。

    • 停止RabbitMQ服务;
    • 重新安装服务使配置生效:rabbitmq-service.bat install

    此命令要切换到路径:D:Program FilesRabbitMQ Server abbitmq_server-3.4.0sbin

    • 启动RabbitMQ服务;

    5、Demo练习。

    消息生产者:
    
    class Program
      {
        static void Main(string[] args)
        {
          try
          {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = Constants.MqHost;
            factory.Port = Constants.MqPort;
            factory.UserName = Constants.MqUserName;
            factory.Password = Constants.MqPwd;
            using (IConnection conn = factory.CreateConnection())
            {
              using (IModel channel = conn.CreateModel())
              {
                //在MQ上定义一个持久化队列,如果名称相同不会重复创建
                channel.QueueDeclare("MyFirstQueue", true, false, false, null);
                while (true)
                {
                  string customStr = Console.ReadLine();
                  RequestMsg requestMsg = new RequestMsg();
                  requestMsg.Name = string.Format("Name_{0}", customStr);
                  requestMsg.Code = string.Format("Code_{0}", customStr);
                  string jsonStr = JsonConvert.SerializeObject(requestMsg);
                  byte[] bytes = Encoding.UTF8.GetBytes(jsonStr);
                  
                  //设置消息持久化
                  IBasicProperties properties = channel.CreateBasicProperties();
                  properties.DeliveryMode = 2;
                  channel.BasicPublish("", "MyFirstQueue", properties, bytes);
    
                  //channel.BasicPublish("", "MyFirstQueue", null, bytes);
    
                  Console.WriteLine("消息已发送:" + requestMsg.ToString());
                }
              }
            }
          }
          catch (Exception e1)
          {
            Console.WriteLine(e1.ToString());
          }
          Console.ReadLine();
        }
      }
    class Program
      {
        static void Main(string[] args)
        {
          try
          {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = Constants.MqHost;
            factory.Port = Constants.MqPort;
            factory.UserName = Constants.MqUserName;
            factory.Password = Constants.MqPwd;
            using (IConnection conn = factory.CreateConnection())
            {
              using (IModel channel = conn.CreateModel())
              {
                //在MQ上定义一个持久化队列,如果名称相同不会重复创建
                channel.QueueDeclare("MyFirstQueue", true, false, false, null);
    
                //输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息
                channel.BasicQos(0, 1, false);
                
                Console.WriteLine("Listening...");
    
                //在队列上定义一个消费者
                QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
                //消费队列,并设置应答模式为程序主动应答
                channel.BasicConsume("MyFirstQueue", false, consumer);
    
                while (true)
                {
                  //阻塞函数,获取队列中的消息
                  BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                  byte[] bytes = ea.Body;
                  string str = Encoding.UTF8.GetString(bytes);
                  RequestMsg msg = JsonConvert.DeserializeObject<RequestMsg>(str);
                  Console.WriteLine("HandleMsg:" + msg.ToString());
                  //回复确认
                  channel.BasicAck(ea.DeliveryTag, false);
                }
              }
            }
          }
          catch (Exception e1)
          {
            Console.WriteLine(e1.ToString());
          }
          Console.ReadLine();
        }
      }
  • 相关阅读:
    Centos6.10-FastDFS-存储器Http配置
    Centos6.10-FastDFS-Storage.conf配置示例
    Centos6.10-FastDFS-Tracker.conf示例配置
    sublime不支持ascill编码办法
    VMware虚拟磁盘修复
    Centos610安装Nexus
    JavaWeb项目用浏览器打开网页出现Session Error提示的解决办法
    Ext里dialog弹窗关闭与父窗口刷新问题总结
    js里常见的三种请求方式$.ajax、$.post、$.get分析
    formValidation单个输入框值改变时校验
  • 原文地址:https://www.cnblogs.com/haoliansheng/p/4398555.html
Copyright © 2011-2022 走看看