zoukankan      html  css  js  c++  java
  • 学习rabbitmq (二) 使用rabbitmq <完结>

    以为rabbitmq会折腾很久,但没有想到就这么多点内容,主要是服务端的懒得去折腾,比如docker的转移啊,发布啊,部署啥的

    今天写了一些代码,用的c#弄的,新建两个项目,一个sender,一个rec,需要说的都在代码里了

    就说一下在vs里安装rabbitmq的client,如果看不懂,也懒得说了

    以下是发送端的代码,就一个窗体

    using System;
    using System.Collections.Generic;
    using System.ComponentModel;
    using System.Data;
    using System.Drawing;
    using System.Linq;
    using System.Text;
    using System.Windows.Forms;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    
    namespace rabbitmq_example_sender
    {
        public partial class Form1 : Form
        {
            public Form1()
            {
                InitializeComponent();
            }
    
            private void button1_Click(object sender, EventArgs e)
            {
                //简单队列模式
                var factory = new ConnectionFactory();
                factory.HostName = "192.168.165.146";//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
                factory.UserName = "guest";                 //RabbitMQ 连接用户名,默认为 guest
                factory.Password = "guest";                 //RabbitMQ 连接密码,默认为 guest
    
                //创建连接对象
                using (var connection = factory.CreateConnection())
                {
                    //创建一个新的通道、会话和模型
                    using (var channel = connection.CreateModel())
                    {
                        /*
                        * 创建一个名为 myQueue1 的消息队列,如果名称相同不会重复创建,参数解释:
                        * 参1:myQueue1, 消息队列名称;
                         * 参2:false, 是否持久化,持久化的队列会存盘,服务器重启后任然存在;
                        * 参3:false, 是否为排他队列,排他队列表示仅对首次声明它的连接可见,并在连接断开时自动删除。这种队列适用于一个客户端同时发送和读取消息的应用场景。
                         * 参4:false, 是否自动删除,自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
                         * 参5:设置队列的其他一些参数,如 x-rnessage-ttl、x-expires、x-rnax-length、x-rnax-length-bytes、x-dead-letter-exchange、x-deadletter-routing-key、x-rnax-priority 等。
                         */
                        channel.QueueDeclare("myQueue1", false, false, false, null);
                        var properties = channel.CreateBasicProperties();
                        properties.DeliveryMode = 1;
      
                        for (int i = 0; i < 10000; i++)
                        {
                            string message = $"Hello RabbitMQ {i}";
                            byte[] body = Encoding.UTF8.GetBytes(message);
                            channel.BasicPublish("", "myQueue1", properties, body);
                            Console.WriteLine($"Send: {message}, {DateTime.Now.ToString("HH:mm:ss fff")}");
                            System.Threading.Thread.Sleep(1000);    //间隔1秒钟发送一次
                        }
    
                    }
    
                }
            }
    
            private void button2_Click(object sender, EventArgs e)
            {
                
                
            }
    
            private void button2_Click_1(object sender, EventArgs e)
            {
                //工作队列模式
                var factory = new ConnectionFactory();
                factory.HostName = "192.168.165.146";//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
                factory.UserName = "guest";                 //RabbitMQ 连接用户名,默认为 guest
                factory.Password = "guest";                 //RabbitMQ 连接密码,默认为 guest
    
                //创建连接对象
                using (var connection = factory.CreateConnection())
                {
                    //创建一个新的通道、会话和模型
                    using (var channel = connection.CreateModel())
                    {
                        /*
                        * 创建一个名为 myQueue1 的消息队列,如果名称相同不会重复创建,参数解释:
                        * 参1:myQueue1, 消息队列名称;
                         * 参2:false, 是否持久化,持久化的队列会存盘,服务器重启后任然存在;
                        * 参3:false, 是否为排他队列,排他队列表示仅对首次声明它的连接可见,并在连接断开时自动删除。这种队列适用于一个客户端同时发送和读取消息的应用场景。
                         * 参4:false, 是否自动删除,自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
                         * 参5:设置队列的其他一些参数,如 x-rnessage-ttl、x-expires、x-rnax-length、x-rnax-length-bytes、x-dead-letter-exchange、x-deadletter-routing-key、x-rnax-priority 等。
                         */
                          
                        bool durable = false;
                        //上面这个参数,我不知道为什么要放在生产端,而且还要求消费端用一样的参数
                        //另外一点跟DeliveryMode或Persistent都是搞持久化的,网上的讲得感觉很模乎,
                        //我理解的是这样的:消息->队列->交换器 ,这三个内容都需要确认是否持久化,也就是说DeliveryMode或Persistent
                        //是决定这个消息是否持久化,QueueDeclare的durable是决定队列是否持久化,而申明exchange 交换器的时候还有一个exchangeDeclare的durable,
                        //是决定这个交换器的持久化
                        //https://blog.csdn.net/u013256816/article/details/60875666/ 只有这个看下是否能理解了
    
                        channel.QueueDeclare("myQueue1", durable, false, false, null);
                        var properties = channel.CreateBasicProperties();
                        properties.DeliveryMode =1;
                        //properties.Persistent = false; //跟上面的其实是一样的,1和false    2和true
                        for (int i = 0; i < 100; i++)
                        {
                            string message = $"Hello RabbitMQ {i}";
                            byte[] body = Encoding.UTF8.GetBytes(message);
                            channel.BasicPublish("", "myQueue1", properties, body);
                            Console.WriteLine($"Send: {message}, {DateTime.Now.ToString("HH:mm:ss fff")}");
                            System.Threading.Thread.Sleep(100);    //间隔1秒钟发送一次
                        }
    
                    }
    
                }
            }
    
             
            private void button3_Click(object sender, EventArgs e)
            {
                String EXCHANGE_NAME = "fanout_exchange";
                //交换器(fanout)
                var factory = new ConnectionFactory();
                factory.HostName = "192.168.165.146";//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
                factory.UserName = "guest";                 //RabbitMQ 连接用户名,默认为 guest
                factory.Password = "guest";                 //RabbitMQ 连接密码,默认为 guest
    
                //创建连接对象
                using (var connection = factory.CreateConnection())
                {
                    //创建一个新的通道、会话和模型
                    using (var channel = connection.CreateModel())
                    {
                        bool durable = false;
                        //3.声明交换器     
                        channel.ExchangeDeclare(EXCHANGE_NAME, ExchangeType.Fanout, durable);
     
                        var properties = channel.CreateBasicProperties();
                        properties.DeliveryMode = 1;
                        //properties.Persistent = false; //跟上面的其实是一样的,1和false    2和true
                        for (int i = 0; i < 50; i++)
                        {
                            string message = $"Hello RabbitMQ {i}";
                            byte[] body = Encoding.UTF8.GetBytes(message);
                            channel.BasicPublish(EXCHANGE_NAME, "", properties, body);
                            Console.WriteLine($"Send: {message}, {DateTime.Now.ToString("HH:mm:ss fff")}");
                            System.Threading.Thread.Sleep(100);    //间隔1秒钟发送一次
                        }
    
                    }
    
                }
            }
    
            private void button4_Click(object sender, EventArgs e)
            {
                String EXCHANGE_NAME = "direct_exchange";
                //路由(direct)
                var factory = new ConnectionFactory();
                factory.HostName = "192.168.165.146";//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
                factory.UserName = "guest";                 //RabbitMQ 连接用户名,默认为 guest
                factory.Password = "guest";                 //RabbitMQ 连接密码,默认为 guest
    
                //创建连接对象
                using (var connection = factory.CreateConnection())
                {
                    //创建一个新的通道、会话和模型
                    using (var channel = connection.CreateModel())
                    {
                        bool durable = false;
                        //3.声明交换器     Direct 才会处理 路由key
                        channel.ExchangeDeclare(EXCHANGE_NAME, ExchangeType.Direct, durable);
    
                        var properties = channel.CreateBasicProperties();
                        properties.DeliveryMode = 1;
                        //properties.Persistent = false; //跟上面的其实是一样的,1和false    2和true
                        for (int i = 0; i < 50; i++)
                        {
                            string message = $"Hello RabbitMQ {i}";
                            byte[] body = Encoding.UTF8.GetBytes(message);
                            channel.BasicPublish(EXCHANGE_NAME, "update", properties, body);
                            Console.WriteLine($"Send: {message}, {DateTime.Now.ToString("HH:mm:ss fff")}");
                            System.Threading.Thread.Sleep(100);    //间隔1秒钟发送一次
                        }
    
                    }
    
                }
            }
    
            private void button5_Click(object sender, EventArgs e)
            {
                //这个不想写了,就是路由的key支持通配符
            }
        }
    }

    以下是接收端的代码

    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace rabbitmq_example_rec
    {
        class Program
        {
            static void rec_simple()
            {
                var factory = new ConnectionFactory();
                factory.HostName = "192.168.165.146";
                factory.UserName = "guest";
                factory.Password = "guest";
                //不能放入 using 语句中,否则当 using 语句结束后会 Close 连接,EventingBasicConsumer.Received 事件将不会被触发
    
                var connection = factory.CreateConnection();
                var channel = connection.CreateModel();
                channel.QueueDeclare("myQueue1", false, false, false, null);
                var consumer = new EventingBasicConsumer(channel);  //消费者(指定消息通道)
                channel.BasicConsume("myQueue1", true, consumer);   //消费消息(在当前通道中监听 myQueue1 队列,并进行消费)
                //该事件在接收到消息时触发
                consumer.Received += (sender, e) =>
                {
                    byte[] body = e.Body.ToArray();   //消息字节数组
                    string message = Encoding.UTF8.GetString(body); //消息内容
    
                    Console.WriteLine($"Received: {message}, {DateTime.Now.ToString("HH: mm:ss fff")}");
                };
    
                Console.ReadLine();
                connection.Close();
                channel.Close();
            }
            static void rec_work1()
            {
                var factory = new ConnectionFactory();
                factory.HostName = "192.168.165.146";
                factory.UserName = "guest";
                factory.Password = "guest";
                //不能放入 using 语句中,否则当 using 语句结束后会 Close 连接,EventingBasicConsumer.Received 事件将不会被触发
    
                var connection = factory.CreateConnection();
                var channel = connection.CreateModel();
                channel.QueueDeclare("myQueue1", false, false, false, null);
                channel.BasicQos(0, 1, false); //该方法可以看出休息小半秒和一秒的区别
      
                var consumer = new EventingBasicConsumer(channel);  //消费者(指定消息通道)
                
                //该事件在接收到消息时触发
                consumer.Received += (sender, e) =>
                {
                    byte[] body = e.Body.ToArray();   //消息字节数组
                    string message = Encoding.UTF8.GetString(body); //消息内容
    
                    Console.WriteLine($"rec_work1 Rec: {message}");
                    channel.BasicAck(e.DeliveryTag, false);   //手工确认
                    Thread.Sleep(200);
                };
                channel.BasicConsume("myQueue1", false, consumer);   //消费消息(在当前通道中监听 myQueue1 队列,并进行消费)
                Console.ReadLine();
                connection.Close();
                channel.Close();
            }
            static void rec_work2()
            {
                var factory = new ConnectionFactory();
                factory.HostName = "192.168.165.146";
                factory.UserName = "guest";
                factory.Password = "guest";
                //不能放入 using 语句中,否则当 using 语句结束后会 Close 连接,EventingBasicConsumer.Received 事件将不会被触发
    
                var connection = factory.CreateConnection();
                var channel = connection.CreateModel();
                channel.QueueDeclare("myQueue1", false, false, false, null);
                channel.BasicQos(0, 1, false);  //该方法可以看出休息小半秒和一秒的区别
     
                
                var consumer = new EventingBasicConsumer(channel);  //消费者(指定消息通道)
                                                                    //该事件在接收到消息时触发
                consumer.Received += (sender, e) =>
                {
                    byte[] body = e.Body.ToArray();   //消息字节数组
                    string message = Encoding.UTF8.GetString(body); //消息内容
    
                    Console.WriteLine($"                                       rec_work2 Rec: {message}");
                    channel.BasicAck(e.DeliveryTag, false);        //手工确认
                    Thread.Sleep(1000);
                };
                channel.BasicConsume("myQueue1", false, consumer);   //消费消息(在当前通道中监听 myQueue1 队列,并进行消费)
                Console.ReadLine();
                connection.Close();
                channel.Close();
    
    
     
            }
    
    
    
            static void rec_fanout1()
            {
                String QUEUE_NAME = "fanout_queue_1";
    
                String EXCHANGE_NAME = "fanout_exchange";
    
                var factory = new ConnectionFactory();
                factory.HostName = "192.168.165.146";
                factory.UserName = "guest";
                factory.Password = "guest";
                //不能放入 using 语句中,否则当 using 语句结束后会 Close 连接,EventingBasicConsumer.Received 事件将不会被触发
    
                var connection = factory.CreateConnection();
                var channel = connection.CreateModel();
                //3.声明交换器
                channel.QueueDeclare(QUEUE_NAME, false, false, false, null);
                //4.绑定队列到交换器
                channel.QueueBind (QUEUE_NAME, EXCHANGE_NAME, "");
    
                channel.BasicQos(0, 1, false); //该方法可以看出休息小半秒和一秒的区别
    
                var consumer = new EventingBasicConsumer(channel);  //消费者(指定消息通道)
                channel.BasicConsume(QUEUE_NAME, false, consumer);   //消费消息(在当前通道中监听 myQueue1 队列,并进行消费)
    
                //该事件在接收到消息时触发
                consumer.Received += (sender, e) =>
                {
                    byte[] body = e.Body.ToArray();   //消息字节数组
                    string message = Encoding.UTF8.GetString(body); //消息内容
    
                    Console.WriteLine($"rec_work1 Rec: {message}");
                    channel.BasicAck(e.DeliveryTag, false);   //手工确认
                    Thread.Sleep(200);
                };
    
                Console.ReadLine();
                connection.Close();
                channel.Close();
            }
            static void rec_fanout2()
            {
                Thread.Sleep(15000);
                String QUEUE_NAME = "fanout_queue_2";
    
                String EXCHANGE_NAME = "fanout_exchange";
    
                var factory = new ConnectionFactory();
                factory.HostName = "192.168.165.146";
                factory.UserName = "guest";
                factory.Password = "guest";
                //不能放入 using 语句中,否则当 using 语句结束后会 Close 连接,EventingBasicConsumer.Received 事件将不会被触发
    
                var connection = factory.CreateConnection();
                var channel = connection.CreateModel();
                //3.声明交换器
                channel.QueueDeclare(QUEUE_NAME, false, false, false, null);
                //4.绑定队列到交换器
                channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    
                channel.BasicQos(0, 1, false); //该方法可以看出休息小半秒和一秒的区别
    
                var consumer = new EventingBasicConsumer(channel);  //消费者(指定消息通道)
                channel.BasicConsume(QUEUE_NAME, false, consumer);   //消费消息(在当前通道中监听 myQueue1 队列,并进行消费)
    
                //该事件在接收到消息时触发
                consumer.Received += (sender, e) =>
                {
                    byte[] body = e.Body.ToArray();   //消息字节数组
                    string message = Encoding.UTF8.GetString(body); //消息内容
    
                    Console.WriteLine($"                                       rec_work2 Rec: {message}");
                    channel.BasicAck(e.DeliveryTag, false);   //手工确认
                    Thread.Sleep(400);
                };
    
                Console.ReadLine();
                connection.Close();
                channel.Close();             
    
    
            }
    
    
            static void rec_direct1()
            {
                String QUEUE_NAME = "direct_queue_1";
    
                String EXCHANGE_NAME = "direct_exchange";
    
                var factory = new ConnectionFactory();
                factory.HostName = "192.168.165.146";
                factory.UserName = "guest";
                factory.Password = "guest";
                //不能放入 using 语句中,否则当 using 语句结束后会 Close 连接,EventingBasicConsumer.Received 事件将不会被触发
    
                var connection = factory.CreateConnection();
                var channel = connection.CreateModel();
                //3.声明交换器
                channel.QueueDeclare(QUEUE_NAME, false, false, false, null);
                //4.绑定队列到交换器
                channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
                channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
                channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, "add");
    
    
    
                channel.BasicQos(0, 1, false); //该方法可以看出休息小半秒和一秒的区别
    
                var consumer = new EventingBasicConsumer(channel);  //消费者(指定消息通道)
                channel.BasicConsume(QUEUE_NAME, false, consumer);   //消费消息(在当前通道中监听 myQueue1 队列,并进行消费)
    
                //该事件在接收到消息时触发
                consumer.Received += (sender, e) =>
                {
                    byte[] body = e.Body.ToArray();   //消息字节数组
                    string message = Encoding.UTF8.GetString(body); //消息内容
    
                    Console.WriteLine($"rec_work1 Rec: {message}");
                    channel.BasicAck(e.DeliveryTag, false);   //手工确认
                    Thread.Sleep(200);
                };
    
                Console.ReadLine();
                connection.Close();
                channel.Close();
            }
            static void rec_direct2()
            {
                String QUEUE_NAME = "direct_queue_2";
    
                String EXCHANGE_NAME = "direct_exchange";
    
                var factory = new ConnectionFactory();
                factory.HostName = "192.168.165.146";
                factory.UserName = "guest";
                factory.Password = "guest";
                //不能放入 using 语句中,否则当 using 语句结束后会 Close 连接,EventingBasicConsumer.Received 事件将不会被触发
    
                var connection = factory.CreateConnection();
                var channel = connection.CreateModel();
                //3.声明交换器
                channel.QueueDeclare(QUEUE_NAME, false, false, false, null);
                //4.绑定队列到交换器
                channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, "select");
     
    
    
                channel.BasicQos(0, 1, false); //该方法可以看出休息小半秒和一秒的区别
    
                var consumer = new EventingBasicConsumer(channel);  //消费者(指定消息通道)
                channel.BasicConsume(QUEUE_NAME, false, consumer);   //消费消息(在当前通道中监听 myQueue1 队列,并进行消费)
    
                //该事件在接收到消息时触发
                consumer.Received += (sender, e) =>
                {
                    byte[] body = e.Body.ToArray();   //消息字节数组
                    string message = Encoding.UTF8.GetString(body); //消息内容
    
                    Console.WriteLine($"                                       rec_work2 Rec: {message}");
                    channel.BasicAck(e.DeliveryTag, false);   //手工确认
                    Thread.Sleep(400);
                };
    
                Console.ReadLine();
                connection.Close();
                channel.Close();
    
                 
    
    
            }
            static void Main(string[] args)
            {
                //简单模式
                ////rec_simple();
    
                ////工作模式 启动两个消费者
                //Thread t1 = new Thread(new ThreadStart(rec_work1));
                //t1.Start();
                //Thread t2 = new Thread(new ThreadStart(rec_work2));
                //t2.Start();
    
    
                //////交换器(fanout)
                ////Thread t1 = new Thread(new ThreadStart(rec_fanout1));
                ////t1.Start();
                ////Thread t2 = new Thread(new ThreadStart(rec_fanout2));
                ////t2.Start();
    
    
                //交换器(direct)  //个人感觉,做ERP相关的管理系统,在实际应中,这种使用的可能性比较大
                Thread t1 = new Thread(new ThreadStart(rec_direct1));
                t1.Start();
                Thread t2 = new Thread(new ThreadStart(rec_direct2));
                t2.Start();
    
    
            }
        }
    }
  • 相关阅读:
    jquery 筛选元素(1)
    jquery操作元素的位置
    jquery 操作css 选择器
    jquery 操作css 尺寸
    jquery 标签中的属性操作
    jquery基本选择器
    jquery表单属性筛选元素
    jquery属性值选择器
    jquery 层级选择器
    jquery的基本选择器
  • 原文地址:https://www.cnblogs.com/szyicol/p/13044752.html
Copyright © 2011-2022 走看看