zoukankan      html  css  js  c++  java
  • 初识RabbitMQ系列之三:.net 如何使用RabbitMQ

    话不多说,直接上代码!

    一:搭建一个解决方案框架:RabbitMQ_Demo

    其中包含4个部分:

    1:RabbitMQ 公用类库项目

    2:一个生产者控制台项目

    3:两个消费者控制台项目

    项目结构如图:

    二:开发之前,需要引用RabbitMQ包

    安装对应的Nuget包,或者下载相关dll也可以,不过建议在线安装nuget,更方便

    搜索:RabbitMQ.Client   安装最新版即可,不知道怎么安装nuget,请移步 http://www.cnblogs.com/Dlonghow/archive/2012/03/16/2399993.html

    安装好以后,开始激动人心的写代码啦!

    三:实现RabbitMQ基本收发消息的功能

    1:在RabbitMQ_Lib类库中新建类:MyRabbitMQ.cs

    public class MyRabbitMQ
        {
            //连接工厂
            private ConnectionFactory factory { get; set; } = new ConnectionFactory
            {
                HostName = "localhost",
                Port = 5672,
                UserName = "guest",
                Password = "guest"
            };
    
            private IConnection connection { get; set; }
    
            private IModel channel { get; set; }
    
            //生产方 构造函数
            public MyRabbitMQ(string exchangeName = "",string exchangeType = ExchangeType.Direct)
            {
                //创建一个连接
                connection = factory.CreateConnection();
                channel = connection.CreateModel();
                
                //创建一个转发器
                channel.ExchangeDeclare(exchangeName, exchangeType);
            }
    
            //消费方 构造函数
            public MyRabbitMQ(string exchangeName = "", string queueName = "", string routingKey = "")
            {
                //创建一个连接
                connection = factory.CreateConnection();
                channel = connection.CreateModel();
    
                //创建一个队列
                channel.QueueDeclare(queueName, true, false, false);
    
                //队列绑定
                channel.QueueBind(queueName, exchangeName, routingKey);
            }
    
            public void SendMessage(string message="", string exchangeName = "", string routingKey = "")
            {
                channel.BasicPublish(exchangeName, routingKey, null, Encoding.UTF8.GetBytes(message));
            }
    
            public QueueingBasicConsumer ReceiveMessage(string queueName = "")
            {
                //EventingBasicConsumer
                var consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume(queueName, true, consumer);
                return consumer;
            }
        }
    View Code

    可以看到,首先有两个构造函数,一个是给生产者使用,一个是给消费者使用,注意参数有所不同,可以看出生产者与消费者关注的点是不一样的。

    无论生产还是消费,都是一个客户端,都需要创建一个RabbitMQ连接并创建一个channel,才可以进行相关的操作,这些操作都是由channel发起的,这样说应该比较白话了。

    构造生产者的时候,主要是创建一个转发器,转发器的名字及类型需要定义,

    转发器常用类型包括三种:direct、fanout、topic,

    这几种类型这里说的更清楚:http://www.cnblogs.com/zhangweizhong/p/5713874.html

    本例子中是以topic为例子的

    2:MQ_Producter项目中发送消息(生产者中发送消息)

    class Program
        {
            static void Main(string[] args)
            {
                string exchangeName = "07281616_exchange_topic";
                string routingkeya = "0728.a.c.routingkey";
                string routingkeyb = "0728.b.routingkey";
                MyRabbitMQ myMQ = new MyRabbitMQ(exchangeName, ExchangeType.Topic);
                for (int i = 0; i < 3600; i++)
                {
                    System.Threading.Thread.Sleep(1000);
                    if (i % 2 == 0)
                    {
                        var message = $"{routingkeyb} -- {DateTime.Now.ToLongTimeString()}";
                        Console.WriteLine($"auto send: {message}  for {routingkeyb}");
                        myMQ.SendMessage(message, exchangeName, routingkeyb);
                    }
                    else
                    {
                        var message = $"{routingkeya} -- {DateTime.Now.ToLongTimeString()}";
                        Console.WriteLine($"auto send: {message}  for {routingkeya}");
                        myMQ.SendMessage(message, exchangeName, routingkeya);
                    }
                    
                }
            }
        }
    View Code

    这里发送了3600次,每秒发一次消息,奇数和偶数发送的路由规则不一样,会有两个不同的客户端来接收,这样方便我们测试消息是否被分发到了不同的队列上

    3:两个消费者项目进行消息的接收

    消费者一:

    class Program
        {
            static void Main(string[] args)
            {
                string queueName = "07281616_queue";
                string exchangeName = "07281616_exchange_topic";
                var routingRule = "0728.*.routingkey";
                MyRabbitMQ myMQ = new MyRabbitMQ(exchangeName, queueName, routingRule);
                var consumer = myMQ.ReceiveMessage(queueName);
                while (true)
                {
                    //BasicConsume 方法是可阻塞的,比较好
                    var msgResponse = consumer.Queue.Dequeue();
                    //这种方法不好,没有阻塞等待
                    //var msgResponse = channel.BasicGet("zzs_queue", true);
                    var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
                    Console.WriteLine($"Received: {msgBody}  (only for {routingRule})");
                }
            }
        }
    View Code

    消费者二:

    class Program
        {
            static void Main(string[] args)
            {
                string queueName = "07281626_queue";
                string exchangeName = "07281616_exchange_topic";
                var routingRule = "0728.a.*.routingkey";
                MyRabbitMQ myMQ = new MyRabbitMQ(exchangeName, queueName, routingRule);
                var consumer = myMQ.ReceiveMessage(queueName);
                while (true)
                {
                    //BasicConsume 方法是可阻塞的,比较好
                    var msgResponse = consumer.Queue.Dequeue();
                    //这种方法不好,没有阻塞等待
                    //var msgResponse = channel.BasicGet("zzs_queue", true);
                    var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
                    Console.WriteLine($"Received: {msgBody} (only for {routingRule})");
                }
            }
        }
    View Code

    两个消费者分别接收不同队列上的消息

    4:运行!

    先编译一下,到bin目录下先运行生产者,在运行两个消费者

    也可以先关掉消费端,过7秒再关掉生产端,在web 管理界面可以看到现在有2个队列里有消息,一个3条一个4条

     四:总结

    完整例子的所有代码都在这里了,代码里相关注释也很清楚,是我自己实现的第一个RabbitMQ收发功能,实际运用中肯定可以有很多扩展,新手们有疑惑或者我理解的有不对的地方,烦请评论处指出哈,大家共同进步!

  • 相关阅读:
    poj2926
    poj2917
    新手入门:PHP网站开发中常见问题汇总
    js字符串转换成数字,数字转换成字符串
    !! jquery主要资料
    IT公司面试手册.htmhttp://www.mianwww.com/html/2009/02/2931.html
    sql语句 返回最后的insert操作所产生的自动增长的id
    eclipse 无法正常保存js
    javascript深入理解js闭包
    如何用javascript操作本地文件(读写txt文件)
  • 原文地址:https://www.cnblogs.com/wolfworker/p/7267542.html
Copyright © 2011-2022 走看看