zoukankan      html  css  js  c++  java
  • 基于EasyNetQ的RabbitMQ封装类

    最近在捣鼓RabbitMQ,为了方便使用,自己基于EasyNetQ封装了一个类,现在贴出来还望各路大佬神明指点,共同学习。

      1     /// <summary>
      2     /// RabbitMQ客户端封装类,基于EasyNetQ,使用时需要从nuget安装EasyNetQ。
      3     /// <para>
      4     /// <example>
      5     /// 使用方法:
      6     /// <code>
      7     /// using(var mq = new RabbitMqClient('rabbitmq连接字符串'))
      8     /// { ...
      9     /// }
     10     /// </code>
     11     /// </example>
     12     /// </para>
     13     /// </summary>
     14     public class RabbitMqClient : IDisposable
     15     {
     16         private readonly IBus bus;
     17 
     18         /// <summary>
     19         /// 构造函数
     20         /// </summary>
     21         /// <param name="connectionString">rabbitmq连接字符串</param>
     22         public RabbitMqClient(string connectionString)
     23         {
     24             if (string.IsNullOrEmpty(connectionString))
     25                 throw new ArgumentNullException(nameof(connectionString));
     26             bus = RabbitHutch.CreateBus(connectionString);
     27         }
     28         /// <summary>
     29         /// 发布一条消息(广播)
     30         /// </summary>
     31         /// <param name="message"></param>
     32         public void Publish<TMessage>(TMessage message) where TMessage:class 
     33         {
     34             bus.PublishAsync(message);
     35         }
     36 
     37         /// <summary>
     38         /// 指定Topic,发布一条消息
     39         /// </summary>
     40         /// <param name="message"></param>
     41         /// <param name="topic"></param>
     42         public void PublishWithTopic<TMessage>(TMessage message, string topic) where TMessage : class
     43         {
     44             if(string.IsNullOrEmpty(topic))
     45                 Publish(message);
     46             else
     47                 bus.PublishAsync(message, x=>x.WithTopic(topic));
     48         }
     49 
     50         /// <summary>
     51         /// 发布消息。一次性发布多条
     52         /// </summary>
     53         /// <param name="messages"></param>
     54         public void PublishMany<TMessage>(List<TMessage> messages) where TMessage : class 
     55         {
     56             foreach (var message in messages)
     57             {
     58                 Publish(message);
     59                 Thread.Sleep(50);//必须加上,以防消息阻塞
     60             }
     61         }
     62 
     63         /// <summary>
     64         /// 发布消息。一次性发布多条
     65         /// </summary>
     66         /// <param name="messages"></param>
     67         /// <param name="topic"></param>
     68         public void PublishManyWithTopic<TMessage>(List<TMessage> messages, string topic) where TMessage : class
     69         {
     70             foreach (var message in messages)
     71             {
     72                 PublishWithTopic(message, topic);
     73                 Thread.Sleep(50);//必须加上,以防消息阻塞
     74             }
     75         }
     76 
     77         /// <summary>
     78         /// 给指定队列发送一条信息
     79         /// </summary>
     80         /// <param name="queue">队列名称</param>
     81         /// <param name="message">消息</param>
     82         public void Send<TMessage>(string queue, TMessage message) where TMessage : class
     83         {
     84             bus.Send(queue, message);
     85         }
     86 
     87         /// <summary>
     88         /// 给指定队列批量发送信息
     89         /// </summary>
     90         /// <param name="queue">队列名称</param>
     91         /// <param name="messages">消息</param>
     92         public void SendMany<TMessage>(string queue, IList<TMessage> messages) where TMessage : class
     93         {
     94             foreach (var message in messages)
     95             {
     96                 SendAsync(queue, message);
     97                 Thread.Sleep(50);//必须加上,以防消息阻塞
     98             }
     99         }
    100 
    101         /// <summary>
    102         /// 给指定队列发送一条信息(异步)
    103         /// </summary>
    104         /// <param name="queue">队列名称</param>
    105         /// <param name="message">消息</param>
    106         /// <returns></returns>
    107         public async void SendAsync<TMessage>(string queue, TMessage message) where TMessage:class 
    108         {
    109             await bus.SendAsync(queue, message);
    110         }
    111 
    112         /// <summary>
    113         /// 从指定队列接收一天信息,并做相关处理。
    114         /// </summary>
    115         /// <param name="queue">队列名称</param>
    116         /// <param name="process">
    117         /// 消息处理委托方法
    118         /// <para>
    119         /// <example>
    120         /// 例如:
    121         /// <code>
    122         /// message=>Task.Factory.StartNew(()=>{
    123         ///     Console.WriteLine(message);
    124         /// })
    125         /// </code>
    126         /// </example>
    127         /// </para>
    128         /// </param>
    129         public void Receive<TMessage>(string queue, Func<TMessage, Task> process) where TMessage:class 
    130         {
    131             bus.Receive(queue, process);
    132         }
    133 
    134         /// <summary>
    135         /// 消息订阅
    136         /// </summary>
    137         /// <param name="subscriptionId">消息订阅标识</param>
    138         /// <param name="process">
    139         /// 消息处理委托方法
    140         /// <para>
    141         /// <example>
    142         /// 例如:
    143         /// <code>
    144         /// message=>Task.Factory.StartNew(()=>{
    145         ///     Console.WriteLine(message);
    146         /// })
    147         /// </code>
    148         /// </example>
    149         /// </para>
    150         /// </param>
    151         public void Subscribe<TMessage>(string subscriptionId, Func<TMessage, Task> process) where TMessage:class 
    152         {
    153             bus.Subscribe<TMessage>(subscriptionId, message => process(message));
    154         }
    155 
    156         /// <summary>
    157         /// 消息订阅
    158         /// </summary>
    159         /// <param name="subscriptionId">消息订阅标识</param>
    160         /// <param name="process">
    161         /// 消息处理委托方法
    162         /// <para>
    163         /// <example>
    164         /// 例如:
    165         /// <code>
    166         /// message=>Task.Factory.StartNew(()=>{
    167         ///     Console.WriteLine(message);
    168         /// })
    169         /// </code>
    170         /// </example>
    171         /// </para>
    172         /// </param>
    173         /// <param name="topic">topic</param>
    174         public void SubscribeWithTopic<TMessage>(string subscriptionId, Func<TMessage, Task> process, string topic) where TMessage:class 
    175         {
    176             bus.Subscribe<TMessage>(subscriptionId, message => process(message), x=>x.WithTopic(topic));
    177         }
    178 
    179         /// <summary>
    180         /// 自动订阅
    181         /// </summary>
    182         /// <param name="assemblyName"></param>
    183         /// <param name="subscriptionIdPrefix"></param>
    184         /// <param name="topic"></param>
    185         public void AutoSubscribe(string assemblyName, string subscriptionIdPrefix, string topic)
    186         {
    187             var subscriber = new AutoSubscriber(bus, subscriptionIdPrefix);
    188             if (!string.IsNullOrEmpty(topic))
    189                 subscriber.ConfigureSubscriptionConfiguration = x => x.WithTopic(topic);
    190             subscriber.Subscribe(Assembly.Load(assemblyName));
    191         }
    192 
    193         /// <summary>
    194         /// 资源释放
    195         /// </summary>
    196         public void Dispose()
    197         {
    198             if (bus != null) bus.Dispose();
    199         }
    200     }
    View Code
  • 相关阅读:
    SQL键值约束、索引使用
    C#字符串的四舍五入
    VB中字符串操作函数
    C#文本选中及ContextMenuStrip菜单使用
    C#关于new的用法
    C#有关日期的使用方法
    break,continue的区别
    在Lua中使用数字的时候有个坑
    关于自动寻径和图、邻接表的学习和启发
    关于在Cocos2dx引擎中手动绑定C++到Lua
  • 原文地址:https://www.cnblogs.com/tongyinaocan/p/6950772.html
Copyright © 2011-2022 走看看