zoukankan      html  css  js  c++  java
  • C# 使用Topshelf 构建 基于 window 服务的 RabbitMQ消费端

    using ServiceStack;
    using System;
    using System.Collections.Generic;
    using System.Configuration;
    using System.IO;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    using Topshelf;
    
    namespace By56.RabbitMQService
    {
        /// <summary>
        ///安装:By56.RabbitMQService.exe install
        ///启动:By56.RabbitMQService.exe start
        ///卸载:By56.RabbitMQService.exe uninstall
        /// </summary>
        class Program
        {
            static void Main(string[] args)
            {
                log4net.Config.XmlConfigurator.Configure(new FileInfo(AppDomain.CurrentDomain.BaseDirectory + @"Config\Log4Net.config"));
    
                var globalConfig = System.IO.File.ReadAllText(AppDomain.CurrentDomain.BaseDirectory + @"GlobalConfig.json");
    
                AppConfigInfo.GlobalConfig = globalConfig.FromJson<GlobalConfigModel>();
                
                HostFactory.Run(x =>
                {
                    x.Service<RabbitMQClient>(s =>
                    {
                        s.ConstructUsing(name => new RabbitMQClient());
                        s.WhenStarted(sv => sv.Start());
                        s.WhenStopped(sv => sv.Stop());
                    });
                    x.RunAsLocalSystem();
                    
                    x.SetDescription(AppConfigInfo.GlobalConfig.ServiceConfig.ServiceDesc);
                    x.SetDisplayName(AppConfigInfo.GlobalConfig.ServiceConfig.DisplayName);
                    x.SetServiceName(AppConfigInfo.GlobalConfig.ServiceConfig.ServiceName);
                });
            }
        }
    }
    

      客户端代码:

    using By56.Tools.Common;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using RestSharp;
    using ServiceStack;
    using ServiceStack.Caching;
    using ServiceStack.Redis;
    using System;
    using System.Collections.Generic;
    using System.Configuration;
    using System.Linq;
    using System.Text;
    using System.Threading;
    
    namespace By56.RabbitMQService
    {
        public class RabbitMQClient
        {
            ConnectionFactory factory;
            IConnection connection;
            IModel channel;
    
    
            ICacheClient cacheClient;
    
    
            public RabbitMQClient()
            {
    
                if (AppConfigInfo.GlobalConfig.RabbitMQConfig.Port != 0)
                {
                    factory.Port = AppConfigInfo.GlobalConfig.RabbitMQConfig.Port;
                }
    
                if (AppConfigInfo.GlobalConfig.CacheConfig.IsRedis)
                {
                    cacheClient = new ServiceStack.Redis.RedisClient(AppConfigInfo.GlobalConfig.CacheConfig.Host, AppConfigInfo.GlobalConfig.CacheConfig.Port, AppConfigInfo.GlobalConfig.CacheConfig.Password);
                }
                else
                {
                    cacheClient = new MemoryCacheClient();
                }
    
            }
    
            /// <summary>
            /// 消息处理
            /// </summary>
            public void RabbitMQRecieve()
            {
                this.channel.QueueDeclare(queue: AppConfigInfo.GlobalConfig.RabbitMQConfig.QueueName,
                                     durable: true,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
    
                this.channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
    
                Console.WriteLine(" [*] Waiting for messages.");
    
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    string message = string.Empty;
    
                    try
                    {
                        var body = ea.Body;
                        message = Encoding.UTF8.GetString(body);
    
                        LogHelper.Info("处理消息内容:" + message);
    
                        //to do
                        RequestModel requestModel = new RequestModel()
                        {
                            Data = message
                        };
    
                        //BaseClientHelper clientHelper = new BaseClientHelper(AppConfigInfo.GlobalConfig.UrlConfig.BaseUrl);
                        //if (AppConfigInfo.GlobalConfig.UrlConfig.Timeout != 0)
                        //{
                        //    clientHelper.Client.Timeout = AppConfigInfo.GlobalConfig.UrlConfig.Timeout;
                        //}
    
                        //BaseResponse<bool> res = null;
                        //switch (AppConfigInfo.GlobalConfig.UrlConfig.Method.ToUpper())
                        //{
                        //    case "GET":
                        //        res = clientHelper.ExecuteGetSingle<bool>(AppConfigInfo.GlobalConfig.UrlConfig.Resource, requestModel);
                        //        break;
                        //    case "POST":
                        //        res = clientHelper.ExecutePostSingle<bool>(AppConfigInfo.GlobalConfig.UrlConfig.Resource, requestModel);
                        //        break;
                        //}
                        BaseResponse<bool> res = CallInterfaceInfo(requestModel);
    
                        if (res == null)
                        {
                            LogHelper.Info("处理失败。接口返回null值。内容:" + message);
                            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                        }
                        else
                        {
                            if (res.IsOK)
                            {
                                LogHelper.Info("处理成功。内容:" + message);
                                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                            }
                            else
                            {
                                LogHelper.Error("处理失败。内容:" + message + " 返回消息:" + res.ToJson());
                                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                            }
                        }
                    }
                    catch (Exception ex)
                    {
                        LogHelper.Error("处理异常。内容:" + message + Environment.NewLine +
                            ex.Message + Environment.NewLine + ex.Source + Environment.NewLine + ex.StackTrace);
    
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    }
    
                };
                channel.BasicConsume(queue: AppConfigInfo.GlobalConfig.RabbitMQConfig.QueueName,
                                     autoAck: false,
                                     consumer: consumer);
    
    
            }
    
            /// <summary>
            /// 调用远程接口处理消息
            /// </summary>
            /// <param name="requestModel"></param>
            /// <returns></returns>
            protected BaseResponse<bool> CallInterfaceInfo(RequestModel requestModel)
            {
                BaseResponse<bool> res = null;
    
                BaseClientHelper clientHelper = new BaseClientHelper(AppConfigInfo.GlobalConfig.UrlConfig.BaseUrl);
                if (AppConfigInfo.GlobalConfig.UrlConfig.Timeout != 0)
                {
                    clientHelper.Client.Timeout = AppConfigInfo.GlobalConfig.UrlConfig.Timeout;
                }
    
                IDictionary<string, string> headerObj = new Dictionary<string, string>();
                headerObj.Add("X-USER-LOGINNAME", "0");
                switch (AppConfigInfo.GlobalConfig.UrlConfig.Method.ToUpper())
                {
                    case "GET":
                        res = clientHelper.ExecuteGetSingle<bool>(AppConfigInfo.GlobalConfig.UrlConfig.Resource, requestModel, headerObj);
                        break;
                    case "POST":
                        res = clientHelper.ExecutePostSingle<bool>(AppConfigInfo.GlobalConfig.UrlConfig.Resource, requestModel, headerObj);
                        break;
                }
    
                return res;
            }
    
            public void Start()
            {
                //to do Start
                this.factory = new ConnectionFactory()
                {
                    HostName = AppConfigInfo.GlobalConfig.RabbitMQConfig.HostName,
                    UserName = AppConfigInfo.GlobalConfig.RabbitMQConfig.UserName,
                    Password = AppConfigInfo.GlobalConfig.RabbitMQConfig.Password
                };
                this.connection = factory.CreateConnection();
                this.channel = connection.CreateModel();
    
                RabbitMQRecieve();
    
                LogHelper.Info("服务启动");
            }
            public void Stop()
            {
                //to do Stop
                if (this.channel != null)
                {
                    this.channel.Dispose();
                    this.channel = null;
                }
                if (this.connection != null)
                {
                    this.connection.Dispose();
                }
                if (this.factory != null)
                {
                    this.factory = null;
                }
    
                LogHelper.Info("服务停止");
            }
        }
    }
  • 相关阅读:
    Oracle 的merge into 用法
    个人博客作业——结课总结
    个人博客作业week7
    结对项目总结博客
    #个人博客作业week3——微软必应词典的使用
    #个人博客作业week2——结对编程伙伴代码复审
    #个人博客作业week2——关于代码规范的个人观点
    #个人博客作业——目前流行的源程序版本管理软件和项目管理软件优缺点
    个人项目——四则运算题目的随机生成
    #个人博客作业Week1——浏览教材后提出的六个问题及软件与软件工程的提出。
  • 原文地址:https://www.cnblogs.com/TallkingIsEasying/p/15181389.html
Copyright © 2011-2022 走看看