zoukankan      html  css  js  c++  java
  • .net core集成使用EasyNetQ来使用rabbitmq

      之前有写到一篇介绍EasyNetQ的博文(C# .net 使用rabbitmq消息队列——EasyNetQ插件介绍 ),所以本文从.net core的角度去继承使用EasyNetQ,而用法类似于之前集成使用rabbitmq的博文:.net core使用rabbitmq消息队列 (二)

      国际惯例,先上代码,但是代码比较多,所有又放gitee了:https://gitee.com/shanfeng1000/dotnetcore-demo/tree/master/EasyNetQ

      

      消息发布(AspNetCore.WebApi.Producer)

      Demo中这个项目是消息的发布程序,在Startup中添加服务:    

      
        public void ConfigureServices(IServiceCollection services)
        {
            var connectionString = "host=192.168.209.133;virtualHost=/;username=admin;password=123456;timeout=60";
            string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
            ushort port = 5672;
            string userName = "admin";
            string password = "123456";
            string virtualHost = "/";
            var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
    
            #region 订阅发布
    
            services.AddEasyNetQProducer("Publish", options =>
            {
                //options.ConnectionString = connectionString;
                options.Hosts = hosts;
                options.Port = port;
                options.Password = password;
                options.UserName = userName;
                options.VirtualHost = virtualHost;
    
                options.PersistentMessages = true;
                options.Priority = 1;
            });
    
            #endregion
            #region 请求响应
    
            services.AddEasyNetQProducer("Request", options =>
            {
                //options.ConnectionString = connectionString;
                options.Hosts = hosts;
                options.Port = port;
                options.Password = password;
                options.UserName = userName;
                options.VirtualHost = virtualHost;
    
                options.PersistentMessages = true;
                options.Priority = 3;
            });
    
            #endregion
            #region 发送接收
    
            services.AddEasyNetQProducer("Send", options =>
            {
                //options.ConnectionString = connectionString;
                options.Hosts = hosts;
                options.Port = port;
                options.Password = password;
                options.UserName = userName;
                options.VirtualHost = virtualHost;
    
                options.Priority = 4;
                options.Queue = "send-recieve";
            });
    
            #endregion
            ......
        }
    ConfigureServices

      添加相关服务使用AddEasyNetQProducer方法,可以指定一个名称,在创建生产者时可以提供指定的名称。熟悉EasyNetQ的朋友应该知道它提供三种消息模式:Publish/Subscribe, Request/Response和 Send/Receive,正是上面的三种申明方式。

      使用时,需要先注入IBusClientFactory对象,使用它的Create方法创建生产者对象,然后使用这个对象的方法操作消息(Publish方法、Request方法、Send方法分别对应上面的三种模式)。

      另外,EasyNetQ的消息都是一些自定的实体类,因此我们发送消息需要自定创建实体类,比如发布订阅消息时创建的实体类Subscriber:  

        public class Subscriber
        {
            public string Message { get; set; }
        }

       使用时:  

        /// <summary>
        /// 发布订阅模式
        /// </summary>
        /// <param name="message"></param>
        /// <returns></returns>
        [HttpGet("Publish")]
        public string Publish(string message)
        {
            message = message ?? "";
            var bus = busFactory.Create("Publish");
            bus.Publish(new Subscriber() { Message = message });
    
            return "success";
        }

      

      消息消费(AspNetCore.WebApi.Consumer)

       首先,在Startup中添加服务:  

      
        public void ConfigureServices(IServiceCollection services)
        {
            var connectionString = "host=192.168.209.133;virtualHost=/;username=admin;password=123456;timeout=60";
            string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
            ushort port = 5672;
            string userName = "admin";
            string password = "123456";
            string virtualHost = "/";
            var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
    
            #region 订阅发布
    
            services.AddEasyNetQConsumer(options =>
            {
                //options.ConnectionString = connectionString;
                options.Hosts = hosts;
                options.Port = port;
                options.Password = password;
                options.UserName = userName;
                options.VirtualHost = virtualHost;
    
                options.AutoDelete = true;
                options.Durable = true;
                options.PrefetchCount = 1;
                options.Priority = 2;
            })
            .AddSubscriber("PubSub1",typeof(EasyNetQSubscriber))
            .AddSubscriber<Subscriber>("PubSub2", r =>
            {
                Console.WriteLine("PubSub:" + r.Message);
            });
    
            #endregion
            #region 请求响应
    
            services.AddEasyNetQConsumer(options =>
            {
                //options.ConnectionString = connectionString;
                options.Hosts = hosts;
                options.Port = port;
                options.Password = password;
                options.UserName = userName;
                options.VirtualHost = virtualHost;
    
                options.Durable = true;
                options.PrefetchCount = 2;
            })
            .AddResponder(typeof(EasyNetQResponder))
            .AddResponder<Requester, Responder>(request =>
            {
                Console.WriteLine("Rpc:" + request.Data);
                return new Responder() { Result = "Rpc:" + request.Data };
            });
    
            #endregion
            #region 发送接收
    
            services.AddEasyNetQConsumer(options =>
            {
                //options.ConnectionString = connectionString;
                options.Hosts = hosts;
                options.Port = port;
                options.Password = password;
                options.UserName = userName;
                options.VirtualHost = virtualHost;
    
                options.Priority = 5;
                options.PrefetchCount = 5;
                options.Exclusive = false;
                options.Arguments = arguments;
                options.Queue = "send-recieve";
            })
            .AddReceiver(typeof(EasyNetQReceiver<Reciever1>))
            .AddReceiver(typeof(EasyNetQReceiver<Reciever2>));
            //.AddReceiver<Reciever1>(r =>
            //{
            //    Console.WriteLine("Reciever1:" + r.Message);
            //})
            //.AddReceiver<Reciever2>(r =>
            //{
            //    Console.WriteLine("Reciever2:" + r.Message);
            //});
    
            #endregion
    
            ......
        }
    ConfigureServices

      这里先使用AddEasyNetQConsumer方法获得一个消费者建造者,然后使用它的AddSubscriber方法、AddResponder方法、AddReceiver方法添加消费消息的处理过程,当然这三个方法分别也是对应上面的三种模式。

      另外,这三个方法添加的消息处理程序可以使用Lambda表达式实现,也可以通过响应的接口实现,比如AddSubscriber方法添加的处理程序可通过实现了IEasyNetQSubscriber<T>接口的类来替代,比如Demo中的EasyNetQSubscriber:  

        public class EasyNetQSubscriber : IEasyNetQSubscriber<Subscriber>
        {
            public void Subscribe(Subscriber message)
            {
                Console.WriteLine("EasyNetQSubscriber:" + message.Message);
            }
        }
  • 相关阅读:
    1118诗名,诗词形式,类别实体导入
    1119飞花令句子,好友关系导入
    1116五言诗生成&古今地名标注与展示
    1120地点实体与事件实体导入
    1121实体导入总结
    1111诗人生平信息提取
    1114诗词收集&藏头诗生成&Snownlp正负情感分析
    1112全体诗人个人生平提取
    1113七言诗词收集与LSTM自动写诗
    WPF ListView DataGrid日期时间类型格式转换
  • 原文地址:https://www.cnblogs.com/shanfeng1000/p/13035758.html
Copyright © 2011-2022 走看看