zoukankan      html  css  js  c++  java
  • .net中RabbitMQ生产者/消费者

     
    
    #region 服务器配置信息是配置再webconfig中
    private static string _HostName = System.Configuration.ConfigurationManager.AppSettings["RabbitMQHostName"].ToString();
    private static string _UserName = System.Configuration.ConfigurationManager.AppSettings["RabbitMQUserName"].ToString();
    private static string _Pass = System.Configuration.ConfigurationManager.AppSettings["RabbitMQPassword"].ToString();
    //声明交换器
    private static string EXCHANGE_NAME = System.Configuration.ConfigurationManager.AppSettings["RabbitMQExchange"].ToString();
    #endregion
    
    //往MQ推送消息---生产者
    
    private StandardResult<string> AccountBalance(BalanceAction BalanceAction)
    {
    string ROUTKEY = "querybalancereq.key";
    string _QueueName = "queue.fclouds.querybalancereq";
    StandardResult<string> result = new StandardResult<string>();
    try
    {
    /*首先,需要创建一个ConnectionFactory,设置目标,由于是在本机,所以设置为localhost,如果RabbitMQ不在本机
    * ,只需要设置目标机器的IP地址或者机器名称即可,然后设置前面创建的用户名和密码。*/
    var factory = new ConnectionFactory();
    factory.HostName = _HostName;//RabbitMQ服务在本地运行
    factory.UserName = _UserName;//用户名
    factory.Password = _Pass;//密码
    //要启用自动连接恢复
    factory.AutomaticRecoveryEnabled = true;
    using (var connection = factory.CreateConnection())
    {
    using (var channel = connection.CreateModel())
    {
    //在MQ上定义一个持久化队列,如果名称相同不会重复创建
    channel.QueueDeclare(_QueueName, true, false, false, null);
    try
    {
    InterfaceLog log = new InterfaceLog();
    log.BatchNumber = "";
    log.InterfaceCode = "ACCOUNTBALANCE";
    log.RecordDate = DateTime.Now;
    log.ID = Generator.GenerateGuid();
    //发送数据
    string xmlStr = ModelSerializer.SerializerToString<BalanceAction>(BalanceAction, true, "", "");
    string message = xmlStr;
    var body = Encoding.UTF8.GetBytes(message);
    var properties = channel.CreateBasicProperties();
    properties.ContentType = "text/plain";
    properties.Persistent = true;
    //开始传递
    channel.BasicPublish(EXCHANGE_NAME, ROUTKEY, properties, body);
    result.Code = StandardStatus.success;
    result.Msg = "操作成功";
    log.RequestContent = message;
    log.InterfaceCode = InterfaceCodeEnum.ACCOUNTBALANCE.ToString();
    log.StatusCode = LogStatus.Success;
    _projectAPIService.AddInterfaceLog(log);
    }
    catch (Exception e)
    {
    result.Code = StandardStatus.fail;
    result.Msg = "操作失败";
    InterfaceLog log = new InterfaceLog();
    log.BatchNumber = "";
    log.InterfaceCode = InterfaceCodeEnum.ACCOUNTBALANCE.ToString();
    log.RecordDate = DateTime.Now;
    log.ID = Generator.GenerateGuid();
    log.ErrorMessage = e.Message;
    log.StatusCode = LogStatus.Fail;
    SendInfo("交易明细查询时推送至MQ时异常", log);
    _projectAPIService.AddInterfaceLog(log);
    }
    }
    }
    }
    catch (Exception ex)
    {
    result.Code = StandardStatus.fail;
    result.Msg = "操作失败";
    InterfaceLog log = new InterfaceLog();
    log.BatchNumber = "";
    log.InterfaceCode = InterfaceCodeEnum.ACCOUNTBALANCE.ToString();
    log.RecordDate = DateTime.Now;
    log.ID = Generator.GenerateGuid();
    log.ErrorMessage = ex.Message;
    log.StatusCode = LogStatus.Fail;
    SendInfo("交易明细查询时推送至MQ时异常", log);
    _projectAPIService.AddInterfaceLog(log);
    }
    return result;
    }
    
    /// <summary>
    /// 账户余额查询响应----消费者
    /// </summary>
    public void AccountBalanceResponse()
    {
    string str = string.Empty;
    
    var factory = new ConnectionFactory();
    factory.HostName = _HostName;
    factory.UserName = _UserName;
    factory.Password = _Pass;
    //要启用自动连接恢复
    factory.AutomaticRecoveryEnabled = true;
    string ROUTKEY = "queryhistoryres.key";
    string _QueueName = "queue.fclouds.querybalanceres";//队列名
    try
    {
    using (var connection = factory.CreateConnection())
    {
    try
    {
    using (var channel = connection.CreateModel())
    {
    channel.QueueDeclare(_QueueName, true, false, false, null);
    
    //绑定队列,通过键 ROUTKEY将队列和交换器绑定起来
    channel.QueueBind(_QueueName, EXCHANGE_NAME, ROUTKEY);
    
    //公平分发 同一时间只处理一个消息
    channel.BasicQos(0, 1, false);
    
    #region 执行消费
    //在队列上定义一个消费者
    var consumer = new QueueingBasicConsumer(channel);
    
    //消费队列,并设置应答模式为程序主动应答
    channel.BasicConsume(_QueueName, false, consumer);
    
    try
    {
    try
    {
    //获取信息
    BasicDeliverEventArgs ea = new BasicDeliverEventArgs();
    bool IsSuccess = consumer.Queue.Dequeue(2000, out ea);
    //var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
    if (IsSuccess && ea != null)
    {
    byte[] bytes = ea.Body;
    
    str = Encoding.UTF8.GetString(bytes);
    
    try
    {
    
    //将响应报文转成model
    TransactionBalance transBalance = new TransactionBalance();
    var tranBlance = ModelSerializer.DeserializerWithXmlString<TransactionBalance>(transBalance, str);
    if (tranBlance != null)
    {
    if (tranBlance.TransactionBody != null)
    {
    if (tranBlance.TransactionBody.response != null)
    {
    try
    {
    //业务操作
    
    //写日志
    var logInfo = AddLog(InterfaceCodeEnum.ACCOUNTBCPONSE.ToString(), true, "", "", str);
    }
    catch (Exception ex)
    {
    //写日志
    var logInfo = AddLog(InterfaceCodeEnum.ACCOUNTBCPONSE.ToString(), false, ex.Message, "", str);
    SendInfo("账户余额查询响应时", logInfo);
    }
    }
    else
    {
    //写日志
    var logInfo = AddLog(InterfaceCodeEnum.ACCOUNTBCPONSE.ToString(), false, "响应报文response节点为空", "", str);
    SendInfo("账户余额查询响应时", logInfo);
    }
    }
    else
    {
    //写日志
    var logInfo = AddLog(InterfaceCodeEnum.ACCOUNTBCPONSE.ToString(), false, "响应报文TransactionBody节点为空", "", str);
    SendInfo("账户余额查询响应时", logInfo);
    }
    
    }
    else
    {
    //写日志
    var logInfo = AddLog(InterfaceCodeEnum.ACCOUNTBCPONSE.ToString(), false, "账户余额查询响应报文为空", "", str);
    SendInfo("账户余额查询响应时", logInfo);
    }
    
    channel.BasicAck(ea.DeliveryTag, false);
    }
    catch (Exception ex)
    {
    //写日志
    var logInfo = AddLog(InterfaceCodeEnum.ACCOUNTBCPONSE.ToString(), false, ex.Message, "", str);
    SendInfo("账户余额查询响应时", logInfo);
    }
    }
    }
    catch (Exception ex)
    {
    //写日志
    var logInfo = AddLog(InterfaceCodeEnum.ACCOUNTBCPONSE.ToString(), false, ex.Message, "", str);
    SendInfo("账户余额查询响应时", logInfo);
    }
    }
    catch (Exception ex)
    {
    //写日志
    var logInfo = AddLog(InterfaceCodeEnum.ACCOUNTBCPONSE.ToString(), false, ex.Message, "", str);
    SendInfo("账户余额查询响应时", logInfo);
    }
    #endregion
    }
    }
    catch (Exception ex)
    {
    //写日志
    var logInfo = AddLog(InterfaceCodeEnum.ACCOUNTBCPONSE.ToString(), false, ex.Message, "", str);
    SendInfo("账户余额查询响应时", logInfo);
    }
    }
    }
    catch (Exception ex)
    {
    //写日志
    var logInfo = AddLog(InterfaceCodeEnum.ACCOUNTBCPONSE.ToString(), false, ex.Message, "", str);
    SendInfo("账户余额查询响应时", logInfo);
    }
    }
  • 相关阅读:
    汉字词组换行
    C#中获取Excel文件的第一个表名
    SQL查找某一条记录的方法
    C#数据库连接字符大全
    整理的asp.net资料!(不得不收藏)
    母版页的优点,及母版页与内容页中相互访问方法
    13范式
    使用 Jackson 树连接线形状
    word2007,取消显示回车符
    三张表之间相互的多对多关系
  • 原文地址:https://www.cnblogs.com/nayilvyangguang/p/10078941.html
Copyright © 2011-2022 走看看