zoukankan      html  css  js  c++  java
  • RabbitMQ与消息总线

    Windows环境安装RabbitMQ,https://www.cnblogs.com/xibei666/p/5931267.html

    1、消息发送流程

    using System;
    using System.Collections.Generic;
    using System.ComponentModel;
    using System.Data;
    using System.Drawing;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    using System.Windows.Forms;
    using RabbitMQ;
    using RabbitMQ.Client;
    
    namespace Practice_MQClient
    {
        public partial class Form1 : Form
        {
            public Form1()
            {
                InitializeComponent();
                this.label3.Text = "";
            }
    
            private void button1_Click(object sender, EventArgs e)
            {
                var factory = new ConnectionFactory
                {
                    HostName = "localhost",
                    VirtualHost = "/", //一个Host中可以设置多个虚拟主机
                    UserName = "guest",
                    Password = "guest",
                };
    
                //factory.Uri = new Uri("amqp://guest:guest@localhost:5672/"); //基于连接串的写法
                var queueName = "order-message-queue";
                var exchangeName = "order-exchange";
                var routeKey = "order-message-routeKey";
                var cn = factory.CreateConnection();
                var channel = cn.CreateModel();
    
                channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false, null); //创建消息交换机,用于分发消息到队列
                channel.QueueDeclare(queueName, true, false, false, null); //创建一个消息队列,用来存储消息
                channel.QueueBind(queueName, exchangeName, routeKey, null); //创建队列、消息交换机、路由Key三者的绑定关系
    
                var msg = this.txtContent.Text;
                var body = Encoding.UTF8.GetBytes(msg);
                channel.BasicPublish(exchangeName, routeKey, null, body); //将消息发送给指定消息交换机,并设置使用的路由Key,后续的消息分发
                                                                          //工作将由交换机根据路由Key匹配到指定的队列,并进行分发。
    
                channel.Close();
                cn.Close();
                this.label3.ForeColor = Color.Chartreuse;
                this.label3.Text = "发送成功";
            }
        }
    }

     2、消息消费过程

    using System;
    using System.Collections.Generic;
    using System.ComponentModel;
    using System.Data;
    using System.Drawing;
    using System.Linq;
    using System.Runtime.CompilerServices;
    using System.Text;
    using System.Threading.Tasks;
    using System.Windows.Forms;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    
    namespace Practice.MQShow
    {
        //参考
        //https://www.cnblogs.com/PatrickLiu/p/7193578.html
        //https://www.cnblogs.com/leocook/p/mq_rabbitmq_4.html
    
        public partial class Form1 : Form
        {
            private delegate void ChangeText(string text);
            private readonly ChangeText _changeText;
    
            public Form1()
            {
                InitializeComponent();
                this.label1.Text = "";
                this._changeText = SetText;
            }
    
            private void Form1_Load(object sender, EventArgs e)
            {
                var factory = new ConnectionFactory
                {
                    HostName = "localhost",
                    VirtualHost = "/",
                    UserName = "guest",
                    Password = "guest",
                };
    
                //var rs = channel.BasicGet(queueName, true); //消息获取方式一:主动拉取队列中的消息
                //if (rs != null)
                //{
                //    var body = rs.Body;
                //    Console.WriteLine(Encoding.UTF8.GetString(body));
                //}
    
                var queueName = "order-message-queue"; //获取消息时只需要连上主机,通过队列名直接获取消息即可
                var cn = factory.CreateConnection();
                var channel = cn.CreateModel();
                var consumer = new EventingBasicConsumer(channel); //消息获取方式二:基于推送的消息获取方式
                consumer.Received += consumer_Received;
                channel.BasicConsume(queueName, true, consumer);
            }
    
            private void SetText(string text)
            {
               this.label1.Text += text + "
    ";
            }
    
            void consumer_Received(object sender, BasicDeliverEventArgs e)
            {
                this.label1.Invoke(_changeText, new object[1] { Encoding.UTF8.GetString(e.Body) });
            }
        }
    }

    3、发送消息时消息确认

    1、单个确认模式
    //
    开启发送方确认模式 channel.confirmSelect(); String message = String.format("时间 => %s", new Date().getTime()); channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8")); if (channel.waitForConfirms()) { System.out.println("消息发送成功" ); }

    看代码可以知道,我们只需要在推送消息之前,channel.confirmSelect()声明开启发送方确认模式,再使用channel.waitForConfirms()等待消息被服务器确认即可。
    2、批量确认模式
    
    // 开启发送方确认模式
    channel.confirmSelect();
    for (int i = 0; i < 10; i++) {
        String message = String.format("时间 => %s", new Date().getTime());
        channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
    }
    channel.waitForConfirmsOrDie(); //直到所有信息都发布,只要有一个未确认就会IOException
    System.out.println("全部执行完成");
    以上代码可以看出来channel.waitForConfirmsOrDie(),使用同步方式等所有的消息发送之后才会执行后面代码,只要有一个消息未被确认就会抛出IOException异常。
    3、异步确认模式
    
    // 开启发送方确认模式
    channel.confirmSelect();
    for (int i = 0; i < 10; i++) {
        String message = String.format("时间 => %s", new Date().getTime());
        channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
    }
    //异步监听确认和未确认的消息
    channel.addConfirmListener(new ConfirmListener() {
        @Override
        public void handleNack(long deliveryTag, boolean multiple) throws IOException {
            System.out.println("未确认消息,标识:" + deliveryTag);
        }
        @Override
        public void handleAck(long deliveryTag, boolean multiple) throws IOException {
            System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
        }
    });

    异步模式的优点,就是执行效率高,不需要等待消息执行完,只需要监听消息即可。

    4、消息消费时手动确认

    private void Form1_Load(object sender, EventArgs e)
            {
                var factory = new ConnectionFactory
                {
                    HostName = "localhost",
                    VirtualHost = "/",
                    UserName = "guest",
                    Password = "guest",
                };
    
                var queueName = "order-message-queue"; //获取消息时只需要连上主机,通过队列名直接获取消息即可
                var cn = factory.CreateConnection();
                var channel = cn.CreateModel();
                var consumer = new EventingBasicConsumer(channel); //消息获取方式二:基于推送的消息获取方式
                consumer.Received += (_sender, _e) =>
                {
                    string msg = Encoding.UTF8.GetString(_e.Body);
                    this.label1.Invoke(_changeText, new object[1] { msg });
                    if (msg != "3")
                    {
                        channel.BasicAck(_e.DeliveryTag, false); //模拟消息消费手动确认,等于3的时候不确认消息
                    } 
                };
    
                channel.BasicConsume(queueName, false, consumer); //设置消息确认方式为手动
            }

     5、死信队列使用

     private void button4_Click(object sender, EventArgs e)
            {
                var factory = new ConnectionFactory();
                factory.Uri = new Uri("amqp://guest:guest@localhost:5672/"); //基于连接串的写法
                var exchangeName = "order-ttl-exchange";
                var routeKey = "my-key";
    
                var cn = factory.CreateConnection();
                var channel = cn.CreateModel();
    
                channel.ExchangeDeclare("dlx_exchange", ExchangeType.Direct, true, false, null);
                channel.QueueDeclare("dlx_queue", true, false, false, null);
                channel.QueueBind("dlx_queue", "dlx_exchange", routeKey, null);
    
                var args = new Dictionary<string, object>();
                args.Add("x-message-ttl", 15000);
                args.Add("x-dead-letter-exchange", "dlx_exchange"); //设定死信队列所使用的exchange
                args.Add("x-dead-letter-routing-key", routeKey); //设定死信队列所使用的routeKey
    
                channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, true, false, null);
                channel.QueueDeclare("order-ttl-canceled", true, false, false, args);
                channel.QueueBind("order-ttl-canceled", exchangeName, "", null);
    
                var msg = this.textBox1.Text;
                var body = Encoding.UTF8.GetBytes(msg);
    
                channel.ConfirmSelect();//启用消息发送确认机制
                channel.BasicPublish(exchangeName, "", null, body);
    
                var result = channel.WaitForConfirms();
                if (result)
                {
                    this.label1.ForeColor = Color.Chartreuse;
                    this.label1.Text = "发送成功";
                }
                else
                {
                    this.label1.ForeColor = Color.Red;
                    this.label1.Text = "发送失败";
                }
    
                channel.Close();
                cn.Close();
            }

     5.1 断线重连 https://www.cnblogs.com/weschen/p/10847842.html

    6、关于事务处理,参考 https://www.cnblogs.com/vipstone/p/9350075.htmlhttps://www.jianshu.com/p/1ee6be549fda

    7、 使用Masstransit开发基于消息传递的分布式应用

    https://www.cnblogs.com/richieyang/p/5730785.html

    https://www.cnblogs.com/richieyang/p/5492432.html

    https://www.cnblogs.com/Andre/p/9579764.html

    https://www.cnblogs.com/edisonchou/p/dnc_microservice_masstransit_foundation_part1.html

    https://www.cnblogs.com/edisonchou/p/dnc_microservice_masstransit_foundation_part2.html
    https://www.cnblogs.com/qkbao/p/6952654.html

    8、实际应用

    https://www.cnblogs.com/itsoku123/p/10811003.html

    https://www.cnblogs.com/itsoku123/p/10813423.html

    9、其他

    https://github.com/sheng-jie/RabbitMQ

    https://www.sojson.com/blog/48.html
    https://www.cnblogs.com/wangiqngpei557/p/6158094.html

  • 相关阅读:
    数据清洗
    JAVA多线程三种实现方式
    QT-4.8.6 编译配置过程
    qt 编译问题总结
    [转载]tslib1.4与Qt4.8.6的交叉编译与移植
    STC12C5A60S2 @ 22.0184Mhz 精确延时
    STC12C5A60S2 双串口通信
    C# Bitmap 复制
    TextMate2 最新版下载及源码编译过程
    mac系统 PHP Nginx环境变量修改
  • 原文地址:https://www.cnblogs.com/huangzelin/p/10742420.html
Copyright © 2011-2022 走看看