zoukankan      html  css  js  c++  java
  • .net core集成使用EasyNetQ来使用rabbitmq

      之前有写到一篇介绍EasyNetQ的博文(C# .net 使用rabbitmq消息队列——EasyNetQ插件介绍 ),所以本文从.net core的角度去继承使用EasyNetQ,而用法类似于之前集成使用rabbitmq的博文:.net core使用rabbitmq消息队列 (二)

      国际惯例,先上代码,但是代码比较多,所有又放gitee了:https://gitee.com/shanfeng1000/dotnetcore-demo/tree/master/EasyNetQ

      

      消息发布(AspNetCore.WebApi.Producer)

      Demo中这个项目是消息的发布程序,在Startup中添加服务:    

      
        public void ConfigureServices(IServiceCollection services)
        {
            var connectionString = "host=192.168.209.133;virtualHost=/;username=admin;password=123456;timeout=60";
            string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
            ushort port = 5672;
            string userName = "admin";
            string password = "123456";
            string virtualHost = "/";
            var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
    
            #region 订阅发布
    
            services.AddEasyNetQProducer("Publish", options =>
            {
                //options.ConnectionString = connectionString;
                options.Hosts = hosts;
                options.Port = port;
                options.Password = password;
                options.UserName = userName;
                options.VirtualHost = virtualHost;
    
                options.PersistentMessages = true;
                options.Priority = 1;
            });
    
            #endregion
            #region 请求响应
    
            services.AddEasyNetQProducer("Request", options =>
            {
                //options.ConnectionString = connectionString;
                options.Hosts = hosts;
                options.Port = port;
                options.Password = password;
                options.UserName = userName;
                options.VirtualHost = virtualHost;
    
                options.PersistentMessages = true;
                options.Priority = 3;
            });
    
            #endregion
            #region 发送接收
    
            services.AddEasyNetQProducer("Send", options =>
            {
                //options.ConnectionString = connectionString;
                options.Hosts = hosts;
                options.Port = port;
                options.Password = password;
                options.UserName = userName;
                options.VirtualHost = virtualHost;
    
                options.Priority = 4;
                options.Queue = "send-recieve";
            });
    
            #endregion
            ......
        }
    ConfigureServices

      添加相关服务使用AddEasyNetQProducer方法,可以指定一个名称,在创建生产者时可以提供指定的名称。熟悉EasyNetQ的朋友应该知道它提供三种消息模式:Publish/Subscribe, Request/Response和 Send/Receive,正是上面的三种申明方式。

      使用时,需要先注入IBusClientFactory对象,使用它的Create方法创建生产者对象,然后使用这个对象的方法操作消息(Publish方法、Request方法、Send方法分别对应上面的三种模式)。

      另外,EasyNetQ的消息都是一些自定的实体类,因此我们发送消息需要自定创建实体类,比如发布订阅消息时创建的实体类Subscriber:  

        public class Subscriber
        {
            public string Message { get; set; }
        }

       使用时:  

        /// <summary>
        /// 发布订阅模式
        /// </summary>
        /// <param name="message"></param>
        /// <returns></returns>
        [HttpGet("Publish")]
        public string Publish(string message)
        {
            message = message ?? "";
            var bus = busFactory.Create("Publish");
            bus.Publish(new Subscriber() { Message = message });
    
            return "success";
        }

      

      消息消费(AspNetCore.WebApi.Consumer)

       首先,在Startup中添加服务:  

      
        public void ConfigureServices(IServiceCollection services)
        {
            var connectionString = "host=192.168.209.133;virtualHost=/;username=admin;password=123456;timeout=60";
            string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
            ushort port = 5672;
            string userName = "admin";
            string password = "123456";
            string virtualHost = "/";
            var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
    
            #region 订阅发布
    
            services.AddEasyNetQConsumer(options =>
            {
                //options.ConnectionString = connectionString;
                options.Hosts = hosts;
                options.Port = port;
                options.Password = password;
                options.UserName = userName;
                options.VirtualHost = virtualHost;
    
                options.AutoDelete = true;
                options.Durable = true;
                options.PrefetchCount = 1;
                options.Priority = 2;
            })
            .AddSubscriber("PubSub1",typeof(EasyNetQSubscriber))
            .AddSubscriber<Subscriber>("PubSub2", r =>
            {
                Console.WriteLine("PubSub:" + r.Message);
            });
    
            #endregion
            #region 请求响应
    
            services.AddEasyNetQConsumer(options =>
            {
                //options.ConnectionString = connectionString;
                options.Hosts = hosts;
                options.Port = port;
                options.Password = password;
                options.UserName = userName;
                options.VirtualHost = virtualHost;
    
                options.Durable = true;
                options.PrefetchCount = 2;
            })
            .AddResponder(typeof(EasyNetQResponder))
            .AddResponder<Requester, Responder>(request =>
            {
                Console.WriteLine("Rpc:" + request.Data);
                return new Responder() { Result = "Rpc:" + request.Data };
            });
    
            #endregion
            #region 发送接收
    
            services.AddEasyNetQConsumer(options =>
            {
                //options.ConnectionString = connectionString;
                options.Hosts = hosts;
                options.Port = port;
                options.Password = password;
                options.UserName = userName;
                options.VirtualHost = virtualHost;
    
                options.Priority = 5;
                options.PrefetchCount = 5;
                options.Exclusive = false;
                options.Arguments = arguments;
                options.Queue = "send-recieve";
            })
            .AddReceiver(typeof(EasyNetQReceiver<Reciever1>))
            .AddReceiver(typeof(EasyNetQReceiver<Reciever2>));
            //.AddReceiver<Reciever1>(r =>
            //{
            //    Console.WriteLine("Reciever1:" + r.Message);
            //})
            //.AddReceiver<Reciever2>(r =>
            //{
            //    Console.WriteLine("Reciever2:" + r.Message);
            //});
    
            #endregion
    
            ......
        }
    ConfigureServices

      这里先使用AddEasyNetQConsumer方法获得一个消费者建造者,然后使用它的AddSubscriber方法、AddResponder方法、AddReceiver方法添加消费消息的处理过程,当然这三个方法分别也是对应上面的三种模式。

      另外,这三个方法添加的消息处理程序可以使用Lambda表达式实现,也可以通过响应的接口实现,比如AddSubscriber方法添加的处理程序可通过实现了IEasyNetQSubscriber<T>接口的类来替代,比如Demo中的EasyNetQSubscriber:  

        public class EasyNetQSubscriber : IEasyNetQSubscriber<Subscriber>
        {
            public void Subscribe(Subscriber message)
            {
                Console.WriteLine("EasyNetQSubscriber:" + message.Message);
            }
        }
  • 相关阅读:
    How to change hostname on SLE
    How to install starDIct on suse OS?
    python logging usage
    How to reset password for unknow root
    How to use wget ?
    How to only capute sub-matched character by grep
    How to inspect who is caller of func and who is the class of instance
    How to use groovy script on jenkins
    Vim ide for shell development
    linux高性能服务器编程 (二) --IP协议详解
  • 原文地址:https://www.cnblogs.com/shanfeng1000/p/13035758.html
Copyright © 2011-2022 走看看