zoukankan      html  css  js  c++  java
  • .Net EasyNetQ的使用

    一.简介

      EasyNetQ是一个容易使用,针对RabbitMQ的.Net Api,它提供了一个尽可能简洁的和适用于RabbitMQ的.Net类库。在EasyNetQ中,消息必须使用.Net class来定义。想发送不同的消息类型需要用不同的class来表示,这个类必须是public的,且带一个默认构造函数和可读属性。

      它通过消息的类型来路由。当你发布一个消息,EasyNetQ会检查消息类型,然后给它一个基于类型名称、命名空间和程序集的路由键。在消费者端,消费者去订阅这个类型。订阅之后,消费者就可以得到这个类型的消息。

      默认情况下,EasyNetQ使用Newtonsoft.Json来序列化.Net 类为Json,这样的好处是消息对于人类的可读性好。因此可以通过类似RabbitMQ管理端应用来调试消息问题。

      EasyNetQ是一个在RabbitMQ.Client类库上提供服务的组件集合。做了序列化、错误处理、线程管理、连接管理等功能。通过一个Mini-Ioc容器组织在一起,你可以很容易地用自己的方法去替换这些组件。下面的是官方给的结构图,能够很好地解释这个组件的结构。

     

     二.EasyNetQ的使用

      自行安装RabbitMQ环境。在.Net项目中安装EasyNetQ包。

       

       定义一个消息类。

    public class TextMessage
    {
        public string Text { get; set; }
    }

    1.创建连接

      使用EasyNetQ连接RabbitMQ,是在应用程序启动时创建一个IBus对象,并且在应用关闭的时候释放该对象。

      RabbitMQ的连接是基于IBus接口,当IBus接口的方法被调用,连接才会开启。创建一个IBus对象方法如下:

    var connStr = "host=127.0.0.1;virtualHost=EDCVHOST;username=admin;password=123456";
    var bus = RabbitHutch.CreateBus(connStr)

      与RabbitMQ服务器的延迟连接由IBus接口表示,创建连接的连接字符串格式为key=value的键值对组成,每一个用分号(;)分割。

      host:例子host=localhost或者host=www.xx.com,如果用到集群配置,那么可以用逗号将服务地址分割开来,如host=a.com,b.com,c.com。

      virtualHost:虚拟主机,默认为“/”。

      username:用户登录名。

      password:用户登录密码。

      requestedHeartbeat:心跳设置,默认是10秒。

      prefetchcount:默认是50秒。

      publisherConfirms:默认是false。

      persistentMessages:消息持久化。默认是true。

      product:产品名。

      platform:平台。

      timeout:默认为10秒。

     2.关闭连接

    bus.Dispose();

      要关闭连接,只需要简单地处理总线,这将关闭EasyNetQ使用的连接、渠道、消费者和其它所有资源。

    3.发布消息和订阅消息

      EasyNetQ支持最简单的消息模式是发布和订阅。发布消息后,任意消费者可以订阅该消息,也可以多个消费者订阅,不需要额外配置。创建一个IBus对象,然后创建一个可序列化的.Net class,调用publish方法即可。

    bus.PubSub.Publish(new TextMessage { Text = "hello world"});

      EasyNetQ提供了消息订阅,当调用Subscribe方法的时候,EasyNetQ会创建一个用于接受消息的队列,不过与消息发布不同的是,消息订阅增加了一个参数,subscribe_id。

    bus.PubSub.Subscribe<TextMessage>("my_test_subscriptionid", HandleTextMessage);
    
    public static void HandleTextMessage(TextMessage textMessage)
    {
        Console.ForegroundColor = ConsoleColor.Red;
        Console.WriteLine("Got message: {0}", textMessage.Text);
        Console.ResetColor();
    }

       第一个参数是订阅id,另外一个是delegate参数,用于处理接收到的消息。这里要注意的是,subscribe_id参数很重要,假如开发者用同一个subscribe_id订阅了同一种消息类型两次或者多次,RabbitMQ会以轮询的方式给每个订阅的队列发送消息。接收到之后,其它队列就接收不到该消息。如果是用不同的subscribe_id订阅同一种消息类型,那么生成的每一个队列都会收到该消息。

      需要注意的是,在处理收到的消息时,不要占用太多时间,否则会影响消息的处理效率,所以,遇到占用长时间的处理方法,最好异步处理。

    4.Request和Response

      发送请求,调用IBus中Rpc的Request方法。

    using (var bus = RabbitHutch.CreateBus(connStr))
    {
        var input = "";
        Console.WriteLine("Please enter a message. 'Quit' to quit.");
        while ((input = Console.ReadLine()) != "Quit")
        {
            var myRequest = new MyRequest { Text = input };
            var response = bus.Rpc.Request<MyRequest, MyResponse>(myRequest);
            Console.WriteLine(response.Text);
        }
    }

      这里我们创建了一个MyRequest的类型的请求,调用了Request方法,用这个消息作为参数。当response得到响应,就将response的内容输出。

      写一个响应请求的服务。

    using (var bus = RabbitHutch.CreateBus(connStr))
    {
        bus.Rpc.Respond<MyRequest, MyResponse>(request => new MyResponse { Text = "Responding to " + request.Text });
        Console.WriteLine("Listening for messages. Hit <return> to quit.");
        Console.ReadLine();
    }

      Response带有参数,一个delegate参数,它能接收请求并返回响应。

     5.Send和Recevie

       Send/Receive模式是特别针对通过命名队列来设计的。它不用假定什么样的消息类型才能够发送到这个队列。这意味着可以发送不同类型的消息到相同的队列中。

      使用IBus对象中SendReceive的Send方法,指定队列名和消息自身。

    using (var bus = RabbitHutch.CreateBus(connStr))
    {
        var input = "";
        Console.WriteLine("Please enter a message. 'Quit' to quit.");
        while ((input = Console.ReadLine()) != "Quit")
        {
            //发送类型为TextMessage的消息
            bus.SendReceive.Send("my.queue", new TextMessage { Text = input });
    
            //发送类型为TextMessage2的消息
            bus.SendReceive.Send("my.queue", new TextMessage2 { Text = input + "233" });
        }
    }

      消息接收,使用IBus对象中SendReceive的Receive方法。在同一个队列中,可以接收一种类型的消息,也可以接收多种类型的消息。

    using (var bus = RabbitHutch.CreateBus(connStr))
    {
        //单一接收类型为TextMessage的send
        bus.SendReceive.Receive<TextMessage>("my.queue", message => Console.WriteLine("Message:{0}", message.Text));
    
        //同时接收类型为TextMessage、TextMessage2的send
        bus.SendReceive.Receive("my.queue", x => x
                .Add<TextMessage>(message => Console.WriteLine("Message:{0}", message.Text))
                .Add<TextMessage2>(message => Console.WriteLine("Message2:{0}", message.Text)));
      Console.WriteLine("Listening for messages. Hit <return> to quit.");
      Console.ReadLine(); }

      这个例子重复接收类型为TextMessage的消息,消费过了就没得再消费了。

     三.总结

      1.EasyNetQ的消息模式有PublisherSubscribe,Request/Response,Send/Receive。

      2.PublisherSubscribe是发布和订阅,生产者发布了某类型消息,消费者订阅了就会实时监听这个类型的消息,然后获取。

      3.Request/Response是请求和返回。生产者和消费者同时定义请求类型和返回类型。当生产者发送了一条请求类型的消息,消费者就会获取这条请求消息并发送一条返回消息,生产者就得到这条返回消息。感觉这就像是发布和订阅,只不过是类似相互发布和订阅。

      4.Send/Receive是发送和接收。它与另外两种模式不同的是,它不是以消息类型为消息生产和消费的依据,而是以队列名为依据,这就可以让它在同一队列中发送和接收不同类型的消息。

  • 相关阅读:
    B+树的Copy-on-Write设计
    so库链接和运行时选择哪个路径下的库?
    Xapian索引-文档检索过程分析之匹配百分比
    Xapian索引-文档检索过程分析
    Xapian的内存索引-添加文档
    Xapian的内存索引
    Xapian使用入门
    一个std::sort 自定义比较排序函数 crash的分析过程
    编译GCC4.8.2
    使用C++11的一点总结
  • 原文地址:https://www.cnblogs.com/shadoll/p/14600337.html
Copyright © 2011-2022 走看看