zoukankan      html  css  js  c++  java
  • RabbitMQ

    /// <summary>
    /// 根据队列名称获取消息
    /// </summary>
    /// <param name="queueName">队列名称</param>
    /// <param name="exchangeName">交换机名称</param>
    /// <param name="exchangeType">模式fanout(广播)</param>
    public void ReadRabbitMQ(string queueName, string exchangeName, string exchangeType = "fanout")
    {
    using (var connection = _connectionFactory.CreateConnection())
    {
    using (var channel = connection.CreateModel())
    {
    int i = 1;
    channel.ExchangeDeclare(exchangeName, exchangeType);
    channel.QueueDeclare(queueName, true, false, false, null);
    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);//告诉broker同一时间只处理一个消息
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
    var msgBody = Encoding.UTF8.GetString(ea.Body);
    Console.WriteLine(" [读取队列:{0}] {1}",queueName,msgBody + "---读取第:" + i + " 条");
    int dots = msgBody.Split('.').Length - 1;
    Console.WriteLine(" 消息读取完成..................");
    /********************************处理业务数据*******************************/
    switch (queueName)
    {

    case "SALE_ORDER":
    try
    {
    this.onSaveChangeBusiness += new SaveChangeBusiness(ORDER.ReceiveSaleOrder);
    var res = this.ActionEvent(msgBody);//触发事件
    Console.WriteLine(DateTime.Now.ToString()+"-执行结果:" + (res > 0 ? "[Success]" : "[Error]") + " ");
    //channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); //临时测试方法
    MessageDispose(channel, ea, res, msgBody);

    }
    catch (Exception ex)
    {
    //消息的拒收并重回队列--重要标识
    Console.WriteLine(ex.Message);
    channel.BasicReject(ea.DeliveryTag, true);
    throw;
    }
    break;


    }
    Thread.Sleep(500);
    /********************************处理业务数据*******************************/
    i++;
    };

    //noAck设置false,告诉broker,发送消息之后,消息暂时不要删除,等消费者处理完成再说
    channel.BasicConsume(queueName, noAck: false, consumer: consumer);
    //Console.WriteLine("MQ消息监听程序已运行....");
    Console.ReadLine();
    }
    }
    }

    /// <summary>
    /// 生产MQ--写入
    /// </summary>
    /// <param name="message">消息</param>
    /// <param name="queueName">队列名称</param>
    /// <param name="exchangeName">交换器名称</param>
    /// <param name="exchangeType">交换器类型</param>
    /// <param name="routingKey">路由</param>
    public int ProducersMQ(string message, string queueName, string exchangeName, string exchangeType, string routingKey = "*")
    {
    int ret = -1;
    try
    {
    using (var connection = _connectionFactory.CreateConnection())
    {
    using (var channel = connection.CreateModel())
    {
    lock (channel)
    {
    channel.ExchangeDeclare(exchangeName, exchangeType, true, false, null);
    channel.QueueDeclare(queueName, true, false, false, null);
    //绑定消息队列,交换器,routingkey
    channel.QueueBind(queueName, exchangeName, routingKey);
    //写入消息编码格式化
    var body = Encoding.UTF8.GetBytes(message);
    IBasicProperties properties = channel.CreateBasicProperties();
    properties.Persistent = true;//持久化
    channel.BasicPublish(exchangeName, queueName, properties, body);
    }

    }
    }
    ret = 1;
    }
    catch (Exception)
    {
    ret = -1;
    }
    return ret;

    }

  • 相关阅读:
    棋盘格渲染
    openvino踩坑之Data type is unsupported
    [video super resolution] ESPCN论文笔记
    tensorflow fp16训练
    openvino安装踩坑记
    python numpy中astype使用不当导致图像出现artifact
    Python~字典
    Django~待解决的问题
    正则表达式应用
    GitLab使用
  • 原文地址:https://www.cnblogs.com/iplaycode/p/10154552.html
Copyright © 2011-2022 走看看