zoukankan      html  css  js  c++  java
  • RabbitMQ使用详解



    RabbitMQ中Exchange的类型

    类型有4种,direct,fanout,topic,headers。其中headers不常用,本篇不做介绍,其他三种类型,会做详细介绍。

    那么这些类型是什么意思呢?就是Exchange与队列进行绑定后,消息根据exchang的类型,按照不同的绑定规则分发消息到消息队列中,可以是一个消息被分发给多个消息队列,也可以是一个消息分发到一个消息队列。具体请看下文。

    介绍之初还要说下RoutingKey,这是个什么玩意呢?他是exchange与消息队列绑定中的一个标识。有些路由类型会按照标识对应消息队列,有些路由类型忽略routingkey。具体看下文。

    1、Exchange类型direct

    他是根据交换器名称与routingkey来找队列的。

    Note:消息从client发出,传送给交换器ChangeA,RoutingKey为routingkey.ZLH,那么不管你发送给Queue1,还是Queue2一个消息都会保存在Queue1,Queue2,Queue3,三个队列中。这就是交换器的direct类型的路由规则。只要找到路由器与routingkey绑定的队列,那么他有多少队列,他就分发给多少队列。

    2、Exchange类型fanout

    这个类型忽略Routingkey,他为广播模式。


    Note:消息从客户端发出,只要queue与exchange有绑定,那么他不管你的Routingkey是什么他都会将消息分发给所有与该exchang绑定的队列中。

    3、Exchange类型topic

    这个类型的路由规则如果你掌握啦,那是相当的好用,与灵活。他是根据RoutingKey的设置,来做匹配的,其中这里还有两个通配符为:

    *,代表任意的一个词。例如topic.zlh.*,他能够匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....

    #,代表任意多个词。例如topic.#,他能够匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....

    Note:这个图看上去很乱,但是他是根据匹配符做匹配的。
    1. public static void Producer(int value)
    2. {
    3. try
    4. {
    5. var qName = "lhtest1";
    6. var exchangeName = "fanoutchange1";
    7. var exchangeType = "fanout";//topic、fanout
    8. var routingKey = "*";
    9. var uri = new Uri("amqp://192.168.10.121:5672/");
    10. var factory = new ConnectionFactory
    11. {
    12. UserName = "123",
    13. Password = "123",
    14. RequestedHeartbeat = 0,
    15. Endpoint = new AmqpTcpEndpoint(uri)
    16. };
    17. using (var connection = factory.CreateConnection())
    18. {
    19. using (var channel = connection.CreateModel())
    20. {
    21. //设置交换器的类型
    22. channel.ExchangeDeclare(exchangeName, exchangeType);
    23. //声明一个队列,设置队列是否持久化,排他性,与自动删除
    24. channel.QueueDeclare(qName, true, false, false, null);
    25. //绑定消息队列,交换器,routingkey
    26. channel.QueueBind(qName, exchangeName, routingKey);
    27. var properties = channel.CreateBasicProperties();
    28. //队列持久化
    29. properties.Persistent = true;
    30. var m = new QMessage(DateTime.Now, value+"");
    31. var body = Encoding.UTF8.GetBytes(DoJson.ModelToJson<QMessage>(m));
    32. //发送信息
    33. channel.BasicPublish(exchangeName, routingKey, properties, body);
    34. }
    35. }
    36. }
    37. catch (Exception ex)
    38. {
    39. Console.WriteLine(ex.Message);
    40. }
    41. }

    消息队列的消费与消息确认Ack

    1、消息队列的消费


    Note:如果一个消息队列中有大量消息等待操作时,我们可以用多个客户端来处理消息,这里的分发机制是采用负载均衡算法中的轮询。第一个消息给A,下一个消息给B,下下一个消息给A,下下下一个消息给B......以此类推。

    2、为啦保证消息的安全性,保证此消息被正确处理后才能在服务端的消息队列中删除。那么rabbitmq提供啦ack应答机制,来实现这一功能。

    ack应答有两种方式:1、自动应答,2、手动应答。具体实现如下。

    1. public static void Consumer()
    2. {
    3. try
    4. {
    5. var qName = "lhtest1";
    6. var exchangeName = "fanoutchange1";
    7. var exchangeType = "fanout";//topic、fanout
    8. var routingKey = "*";
    9. var uri = new Uri("amqp://192.168.10.121:5672/");
    10. var factory = new ConnectionFactory
    11. {
    12. UserName = "123",
    13. Password = "123",
    14. RequestedHeartbeat = 0,
    15. Endpoint = new AmqpTcpEndpoint(uri)
    16. };
    17. using (var connection = factory.CreateConnection())
    18. {
    19. using (var channel = connection.CreateModel())
    20. {
    21. channel.ExchangeDeclare(exchangeName, exchangeType);
    22. channel.QueueDeclare(qName, true, false, false, null);
    23. channel.QueueBind(qName, exchangeName, routingKey);
    24. //定义这个队列的消费者
    25. QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
    26. //false为手动应答,true为自动应答
    27. channel.BasicConsume(qName, false, consumer);
    28. while (true)
    29. {
    30. BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
    31. byte[] bytes = ea.Body;
    32. var messageStr = Encoding.UTF8.GetString(bytes);
    33. var message = DoJson.JsonToModel<QMessage>(messageStr);
    34. Console.WriteLine("Receive a Message, DateTime:" + message.DateTime.ToString("yyyy-MM-dd HH:mm:ss") + " Title:" + message.Title);
    35. //如果是自动应答,下下面这句代码不用写啦。
    36. if ((Convert.ToInt32(message.Title) % 2) == 1)
    37. {
    38. channel.BasicAck(ea.DeliveryTag, false);
    39. }
    40. }
    41. }
    42. }
    43. }
    44. catch (Exception ex)
    45. {
    46. Console.WriteLine(ex.Message);
    47. }
    48. }






  • 相关阅读:
    操作datetable 里面查出来的某个字段
    C# 字符串去重 还有 去除最后一位逗号。
    C# .net 调用ERP接口
    视图下拉列表接收控制器传来的值,并选中下拉类表中该值相对应的选项(新手笔记,请各位大神指教)
    MVC5控制器传值的三种方式(ViewData,ViewBag,TempData),刚刚学习MVC5的新手,希望各位大神多多指教
    c++模板之SFINAE
    c++头文件包含问题
    成员函数指针有多态的效果吗?
    emacs基本操作
    在c++中用function与bind实现委托
  • 原文地址:https://www.cnblogs.com/hsyzero/p/6650135.html
Copyright © 2011-2022 走看看