zoukankan      html  css  js  c++  java
  • 在C#中使用消息队列RabbitMQ

    参考文章:http://www.cnblogs.com/qy1141/p/4054135.html

    开发环境&工具: VS2017 RabbitMq Erlang运行环境

    先安装Erlang运行环境然后再安装RabbitMq

    安装和配置就不说了

    默认安装路径:C:Program FilesRabbitMQ Server,在rabbitmq_server-3.6.11sbin文件夹下有bat文件

    默认配置文件路径: C:UserswangshibangAppDataRoamingRabbitMQ abbitmq.config

    下面直接说代码

    客户端Client把发送的消息储存到RabbitMq中,服务器开启的时候会从Rabbitmq中读取储存的消息

    项目结构图

    Clinet端代码(记得添加 RabbitMQ.Client的Nuget包)

     1 using Newtonsoft.Json;
     2 using RabbitMQ.Client;
     3 using System;
     4 using System.Text;
     5 using RabbitMqLib;
     6 
     7 namespace RabbitMqClient
     8 {
     9     class Program
    10     {
    11         static void Main(string[] args)
    12         {
    13             try
    14             {
    15                 ConnectionFactory factory = new ConnectionFactory();
    16                 factory.HostName = Constants.MqHost;
    17                 factory.Port = Constants.MqPort;
    18                 factory.UserName = Constants.MqPwd;
    19                 using (IConnection conn = factory.CreateConnection())
    20                 {
    21                     using (IModel channel = conn.CreateModel())
    22                     {
    23                         channel.QueueDeclare("MyFirstQueue", true, false, false, null);
    24                         while (true)
    25                         {
    26                             string customStr = Console.ReadLine();
    27                             RequestMsg requestMsg = new RequestMsg
    28                             {
    29                                 Name = $"Name_{customStr}",
    30                                 Code = $"Code_{customStr}"
    31                             };
    32                             string jsonStr = JsonConvert.SerializeObject(requestMsg);
    33                             byte[] bytes = Encoding.UTF8.GetBytes(jsonStr);
    34 
    35                             //设置消息持久化
    36                             IBasicProperties properties = channel.CreateBasicProperties();
    37                             properties.DeliveryMode = 2;
    38                             channel.BasicPublish("", "MyFirstQueue", properties, bytes);
    39 
    40                             Console.WriteLine("消息已经发送:" + requestMsg.ToString());
    41                         }
    42                     }
    43                 }
    44             }
    45             catch (Exception ex)
    46             {
    47                 Console.WriteLine(ex.ToString());
    48             }
    49             Console.ReadLine();
    50         }
    51     }
    52 }

    Server端代码(记得添加 RabbitMQ.Client Nuget包)

     1 using Newtonsoft.Json;
     2 using RabbitMQ.Client;
     3 using RabbitMQ.Client.Events;
     4 using RabbitMqLib;
     5 using System;
     6 using System.Text;
     7 
     8 namespace RabbitMqServer
     9 {
    10     class Program
    11     {
    12         static void Main(string[] args)
    13         {
    14             try
    15             {
    16                 ConnectionFactory factory = new ConnectionFactory();
    17                 factory.HostName = Constants.MqHost;
    18                 factory.Port = Constants.MqPort;
    19                 factory.UserName = Constants.MqUserName;
    20                 factory.Password = Constants.MqPwd;
    21                 using (IConnection conn = factory.CreateConnection())
    22                 {
    23                     using (IModel channel = conn.CreateModel())
    24                     {
    25                         //在MQ上定义一个持久化队列,如果名称相同不会重复创建
    26                         channel.QueueDeclare("MyFirstQueue", true, false, false, null);
    27                         //输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息
    28                         channel.BasicQos(0, 1, false);
    29                         Console.WriteLine("Listening...");
    30                         //在队列上定义一个消费者
    31                         QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
    32                         //消费队列,并设置应答模式为程序主动应答
    33                         channel.BasicConsume("MyFirstQueue", false, consumer);
    34 
    35                         while (true)
    36                         {
    37                             //阻塞函数,获取队列中的消息
    38                             BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
    39                             byte[] bytes = ea.Body;
    40                             string str = Encoding.UTF8.GetString(bytes);
    41                             RequestMsg msg = JsonConvert.DeserializeObject<RequestMsg>(str);
    42                             Console.WriteLine("HandleMsg:" + msg.ToString());
    43                             //回复确认
    44                             channel.BasicAck(ea.DeliveryTag, false);
    45                         }
    46                     }
    47                 }
    48             }
    49             catch (Exception ex)
    50             {
    51                 Console.WriteLine(ex.ToString());
    52             }
    53             Console.ReadLine();
    54         }
    55     }
    56 }

    客户端显示效果

    服务端显示效果

    最后还有类库的两个类

     1 using System;
     2 using System.Collections.Generic;
     3 using System.Linq;
     4 using System.Text;
     5 using System.Threading.Tasks;
     6 
     7 namespace RabbitMqLib
     8 {
     9     public static class Constants
    10     {
    11         public static string MqHost { get; } = "localhost";
    12 
    13         public static int MqPort { get; } = 5672;
    14 
    15         public static string MqPwd { get; } = "guest";
    16 
    17         public static string MqUserName { get; } = "guest";
    18     }
    19 }
     1 namespace RabbitMqLib
     2 {
     3     public class RequestMsg
     4     {
     5         public string Name { get; set; }
     6 
     7         public string Code { get; set; }
     8 
     9         public override string ToString()
    10         {
    11             return $"Name: {Name}, Code: {Code}";
    12         }
    13     }
    14 }

    Constants的具体配置可以参考这篇文章

    http://www.rabbitmq.com/dotnet-api-guide.html

    poster: 由于QueueingBasicConsumer已经过时,所以Server的代码需要修改

     1           try
     2             {
     3                 ConnectionFactory factory = new ConnectionFactory
     4                 {
     5                     HostName = Constants.MqHost,
     6                     Port = Constants.MqPort,
     7                     UserName = Constants.MqUserName,
     8                     //Password = Constants.MqPwd
     9                 };
    10                 using (IConnection conn = factory.CreateConnection())
    11                 using (IModel channel = conn.CreateModel())
    12                 {
    13 
    14                     //在MQ上定义一个持久化队列,如果名称相同不会重复创建
    15                     channel.QueueDeclare("MyFirstQueue", true, false, false, null);
    16                     //输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息
    17                     channel.BasicQos(0, 1, false);
    18                     Console.WriteLine("Listening...");
    19 
    20                     //该代码为原始程序
    21                     //在队列上定义一个消费者
    22                     EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
    23                     consumer.Received += (sender, e) =>
    24                     {
    25                         var body = e.Body;
    26                         string message = Encoding.UTF8.GetString(body);
    27                         //RequestMsg msg = JsonConvert.DeserializeObject<RequestMsg>(message);
    28                         Console.WriteLine("HandleMsg:" + message);
    29                         //Thread.Sleep(1000);
    30                         //回复确认
    31                         channel.BasicAck(e.DeliveryTag, false);
    32                     };
    33                     //进行消费
    34                     channel.BasicConsume("MyFirstQueue", false, consumer);
    35                     Console.WriteLine("Press Enter to Exit");
    36                     Console.ReadLine();
    37                     /*该代码为原始程序,正常。但QueueingBasicConsumer已经过时
    38                     //在队列上定义一个消费者
    39                     QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
    40                     //消费队列,并设置应答模式为程序主动应答
    41                     channel.BasicConsume("MyFirstQueue", false, consumer);
    42                     while (true)
    43                     {
    44                         //阻塞函数,获取队列中的消息
    45                         BasicDeliverEventArgs ea = consumer.Queue.Dequeue();
    46                         byte[] bytes = ea.Body;
    47                         string str = Encoding.UTF8.GetString(bytes);
    48                         RequestMsg msg = JsonConvert.DeserializeObject<RequestMsg>(str);
    49                         Console.WriteLine("HandleMsg:" + msg.ToString());
    50                         //回复确认
    51                         channel.BasicAck(ea.DeliveryTag, false);
    52                     }
    53                     */
    54                 }
    55             }
    56             catch (Exception ex)
    57             {
    58                 Console.WriteLine(ex.ToString());
    59             }
  • 相关阅读:
    CentOS 7.3 系统安装配置图解教程
    图床神器:七牛云 + Mpic + FScapture
    Markdown 使用教程
    Python小游戏、小程序
    深入理解Python中的yield和send
    替代crontab,任务计划统一集中管理系统cronsun简介
    变量命名神器Codelf
    Spring Aspect实现AOP切面
    SpringCloud之注册中心Eureka搭建
    SpringCloud中eureka配置心跳和剔除下线的服务的时间
  • 原文地址:https://www.cnblogs.com/wangyulong/p/7452465.html
Copyright © 2011-2022 走看看