zoukankan      html  css  js  c++  java
  • RabbitMQ学习笔记

    一、rabbitmqctl

    启动rabbitmq rabbitmqctl start_app 

    关闭rabbitmq  rabbitmqctl stop_app 

    格式化rabbitmq   rabbitmqctl reset (格式化之前需要先关闭rabbitmq)

    强制格式化rabbitmq   rabbitmqctl force_reset  

    二、ExChange

    1,Direct (直连)

     通过routingkey发送到指定的queue

    using EasyNetQ;
    using EasyNetQ.Topology;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace DirectConsumer
    {
        class Program
        {
            static void Main(string[] args)
            {
                var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter").Advanced;
                var ex= bus.ExchangeDeclare("direct", ExchangeType.Direct);
                var que= bus.QueueDeclare("001");//001为queue的名称
                bus.Bind(ex, que, "000");//000为routingkey
    
                bus.Consume(que, (body, properties, info) => Task.Factory.StartNew(() =>
                {
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine("Got message: '{0}'", message);
                }));
    
                Console.ReadKey();
            }
        }
    }
    DirectConsumer
    using EasyNetQ;
    using EasyNetQ.Topology;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace DirectProduce
    {
        class Program
        {
            static void Main(string[] args)
            {
    
                var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter").Advanced;
                var ex = bus.ExchangeDeclare("direct", ExchangeType.Direct);a
    
                var message = new Message<string>("sad");
                //000:为routingkey
                bus.Publish<string>(ex, "000", false, message);
    
                bus.Dispose();
    
            }
        }
    }
    DirectProduce

    2,Fanout(广播)

     

    使用这种类型的Exchange,会忽略routing key的存在,直接将message广播到所有的Queue中。

    适用场景:

                    第一:大型玩家在玩在线游戏的时候,可以用它来广播重大消息。这让我想到电影微微一笑很倾城中,有款游戏需要在世界上公布玩家重大消息,也许这个就是用的MQ实现的。这让我不禁佩服肖奈,人家在大学的时候就知道RabbitMQ的这种特性了。

                    第二:体育新闻实时更新到手机客户端。

                    第三:群聊功能,广播消息给当前群聊中的所有人。

    using EasyNetQ;
    using EasyNetQ.Topology;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace FanoutConsumer
    {
        class Program
        {
            static void Main(string[] args)
            {
                
                var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter").Advanced;
                var ex = bus.ExchangeDeclare("fanout", ExchangeType.Fanout);
                var que = bus.QueueDeclare("directQueue");//001为queue的名称
                bus.Bind(ex, que, string.Empty);//Fanout不需要设置routingkey
                bus.Consume(que, (body, properties, info) => Task.Factory.StartNew(() =>
                {
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine("Got message: '{0}'", message);
                }));
    
                Console.ReadKey();
    
            }
        }
    }
    FanoutConsumer
    using EasyNetQ;
    using EasyNetQ.Topology;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace FanoutConsumer2
    {
        class Program
        {
            static void Main(string[] args)
            {
                var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter").Advanced;
                var ex = bus.ExchangeDeclare("fanout", ExchangeType.Fanout);
                var que = bus.QueueDeclare("directQueue2");//001为queue的名称
                bus.Bind(ex, que, string.Empty);//Fanout不需要设置routingkey
                bus.Consume(que, (body, properties, info) => Task.Factory.StartNew(() =>
                {
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine("Got message: '{0}'", message);
                }));
    
                Console.ReadKey();
            }
        }
    }
    FanoutConsumer2
    using EasyNetQ;
    using EasyNetQ.Topology;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace FanoutProduce
    {
        class Program
        {
            static void Main(string[] args)
            {
                var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter").Advanced;
                var ex = bus.ExchangeDeclare("fanout", ExchangeType.Fanout);
                var message = new Message<string>("sad");
    
                //Fanout不需要设置routingkey
                bus.Publish<string>(ex, string.Empty, false, message);
    
                bus.Dispose();
    
    
            }
        }
    }
    FanoutProduce

    3,Topic(主题)

    Topic Exchange是根据routing key和Exchange的类型将message发送到一个或者多个Queue中

    使用场景:

                   新闻的分类更新

                   同一任务多个工作者协调完成

                   同一问题需要特定人员知晓

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    using EasyNetQ;
    
    namespace TopicConsumer
    {
        class Program
        {
            static void Main(string[] args)
            {
                var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter");
    
                //正确的使用方法:
                //c=> { c.WithTopic("*.cn"); }设置    Routing key。如果没有这句,则Routing key为#
                //*.com 只是queue的名称
                //bus.Subscribe<string>("*.com", r => Console.WriteLine(r),c=> { c.WithTopic("*.cn"); });
    
                //subscriptionId是queue的名称
                //subscriptionId+exchangeType=唯一
                bus.Subscribe<string>("*.com", r => Console.WriteLine(r));
                bus.Subscribe<string>("*.cn", r => Console.WriteLine(r));
    
                Console.ReadKey();
            }
        }
    }
    TopicConsumer
    using EasyNetQ;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace TopicProduce
    {
        class Program
        {
            static void Main(string[] args)
            {
                var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter");
    
                bus.Publish<string>("你好", "www.oyunkeji.com");
                //bus.Publish<string>("你好", c => c.WithTopic("www.oyunkeji.com"));
    
                bus.Dispose();
            }
        }
    }
    TopicProduce

    4,Headers(头信息)

     

    它是根据Message的一些头部信息来分发过滤Message,忽略routing key的属性,如果Header信息和message消息的头信息相匹配,那么这条消息就匹配上了

    x-match的头部必须设置:

    当x-match的值设置为all时,header信息必须全部满足才会匹配上

    当x-match的值设置为any时,header信息满足其中任意一个就会匹配上

    using EasyNetQ;
    using EasyNetQ.Topology;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace HeadersConsumer
    {
        class Program
        {
            static void Main(string[] args)
            {
                var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter").Advanced;
                var ex = bus.ExchangeDeclare("headers", ExchangeType.Header);
                var que = bus.QueueDeclare("headersQueue");//001为queue的名称
                bus.Bind(ex, que, string.Empty,new Dictionary<string, object>() {
                    { "x-match","all"},
                    { "username","hunter"},
                    { "password","hunter"}
                });//Header不需要设置routingkey
                bus.Consume(que, (body, properties, info) => Task.Factory.StartNew(() =>
                {
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine("Got message: '{0}'", message);
                }));
    
                Console.ReadKey();
    
            }
        }
    }
    HeadersConsumer
    using EasyNetQ;
    using EasyNetQ.Topology;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace HeadersProduce
    {
        class Program
        {
            static void Main(string[] args)
            {
                var bus = RabbitHutch.CreateBus("host=192.168.1.193:5672;username=hunter;password=hunter").Advanced;
                var ex = bus.ExchangeDeclare("headers", ExchangeType.Header);
    
                var properties = new MessageProperties();
                properties.Headers.Add("username", "hunter");
                properties.Headers.Add("password", "hunter");
    
                //Fanout不需要设置routingkey
                bus.Publish(ex, string.Empty, false, properties, Encoding.UTF8.GetBytes("你好"));
                bus.Dispose();
    
            }
        }
    }
    HeadersProduce

    案例下载:https://pan.baidu.com/s/1gVBO3qLl9Dw5tIhIpETvkw

    三、Arguments

    1,Message TTL(x-message-ttl)

    发布到队列的消息在丢弃之前可以存活多长时间(毫秒)。

    ①针对队列中的所有消息

    using EasyNetQ;
    using EasyNetQ.Topology;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace ConsoleApp1
    {
        class Program
        {
            static void Main(string[] args)
            {
                var bus = RabbitHutch.CreateBus("host=localhost:5672;username=hunter;password=hunter").Advanced;
                var ex = bus.ExchangeDeclare("directText", ExchangeType.Direct);
    
                //001为queue的名称
                //001队列下的消息5秒钟没有被消费自动删除
                var que = bus.QueueDeclare("001",perQueueMessageTtl:5000);
                bus.Bind(ex, que, "000");//000为routingkey
    
                bus.Consume(que, (body, properties, info) => Task.Factory.StartNew(() =>
                {
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine("Got message: '{0}'", message);
                }));
    
    
                Console.ReadKey();
            }
        }
    }
    consumer
    using EasyNetQ;
    using EasyNetQ.Topology;
    using System;
    using System.Collections;
    using System.Collections.Generic;
    using System.Linq;
    using System.Linq.Expressions;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace ConsoleApp2
    {
        class Program
        {
            static void Main(string[] args)
            {
                var bus = RabbitHutch.CreateBus("host=localhost:5672;username=hunter;password=hunter").Advanced;
                var ex = bus.ExchangeDeclare("directText", ExchangeType.Direct);
                var properties = new MessageProperties();
                var message = new Message<string>("你好");
                //000:为routingkey
                bus.Publish<string>(ex, "000", false, message);
    
                bus.Dispose();
    
            }
        }
    }
    produce

    ②指定某个消息

    using EasyNetQ;
    using EasyNetQ.Topology;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace ConsoleApp1
    {
        class Program
        {
            static void Main(string[] args)
            {
                var bus = RabbitHutch.CreateBus("host=localhost:5672;username=hunter;password=hunter").Advanced;
                var ex = bus.ExchangeDeclare("directText", ExchangeType.Direct);
    
                //001为queue的名称
                //001队列下的消息5秒钟没有被消费自动删除
                var que = bus.QueueDeclare("001");
                bus.Bind(ex, que, "000");//000为routingkey
    
                bus.Consume(que, (body, properties, info) => Task.Factory.StartNew(() =>
                {
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine("Got message: '{0}'", message);
                }));
    
    
                Console.ReadKey();
            }
        }
    }
    consumer
    using EasyNetQ;
    using EasyNetQ.Topology;
    using System;
    using System.Collections;
    using System.Collections.Generic;
    using System.Linq;
    using System.Linq.Expressions;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace ConsoleApp2
    {
        class Program
        {
            static void Main(string[] args)
            {
                var bus = RabbitHutch.CreateBus("host=localhost:5672;username=hunter;password=hunter").Advanced;
                var ex = bus.ExchangeDeclare("directText", ExchangeType.Direct);
                var properties = new MessageProperties();
                properties.Expiration = "5000";//单位:毫秒
                //000:为routingkey
                bus.Publish(ex, "000", false, properties, Encoding.UTF8.GetBytes("你好"));
    
                bus.Dispose();
    
            }
        }
    }
    produce

    2,Auto expire(x-expires)

     queue在指定的时间未被访问,就会被删除(毫秒)。

    3,Max length(x-max-length)

    限定队列的最大长度,

    4,Max length bytes(x-max-length-bytes)

    限定队列的最大占用空间大小

    5,Overflow behaviour(x-overflow)

     设置队列溢出行为。这决定了在到达队列的最大长度时消息会发生什么情况。有效值是 drop-head(删除头)或者 reject-publish(拒绝发布) 。

    6,Dead letter exchange/Dead letter routing key(x-dead-letter-exchange/x-dead-letter-routing-key)

    queue中的message过期时间。

    basicreject...basicnack等等。。。

    这三种情况一般会drop这些message。。。

    Dead letter exchange:时候我们不希望message被drop掉,而是走到另一个队列中,又或者是保存起来

    Dead letter routing key:指定的routing key

    8,Maximum priority

    (x-max-priority)

    定义消息的优先级

    9,Lazy mode(x-queue-mode)

    将队列设置为延迟模式,在磁盘上保留尽可能多的消息以减少RAM使用量; 如果未设置,队列将保留内存中的缓存以尽可能快地传递消息。

    10,Master locator (x-queue-master-locator)

    将队列设置为主位置模式,确定队列主节点在节点集群上声明时所处的规则。

    四、高可靠消息队列

    1,消费端的确认

    ①自动确认

    message出队列的时候就自动确认

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    
    namespace Consumer
    {
        class Program
        {
            static void Main(string[] args)
            {
                ConnectionFactory factory = new ConnectionFactory()
                {
                    HostName = "localhost",
                    Port = 5672,
                    UserName = "hunter",
                    Password = "hunter"
                };
    
                //创建connection
                var connection = factory.CreateConnection();
    
                //创建chanel
                var channel = connection.CreateModel();
    
                //创建exchange
                channel.ExchangeDeclare("exchange1", ExchangeType.Direct, true, false, null);
    
                //创建queue
                channel.QueueDeclare("queue1", true, false, false, null);
    
                //exchange绑定queue
                channel.QueueBind("queue1", "exchange1", "queue1", null);
    
    
                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                consumer.Received += (send, e) =>
                {
                    Console.WriteLine(Encoding.UTF8.GetString(e.Body));
                };
    
                //autoAck 设置为true:自动确认
                channel.BasicConsume("queue1", true, consumer);
    
                Console.ReadKey();
            }
    
        }
    }
    Consumer
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    using RabbitMQ.Client;
    
    namespace Produce
    {
        class Program
        {
            static void Main(string[] args)
            {
                ConnectionFactory factory = new ConnectionFactory()
                {
                    HostName = "localhost",
                    Port = 5672,
                    UserName = "hunter",
                    Password = "hunter"
                };
    
                //创建connection
                var connection = factory.CreateConnection();
    
                //创建chanel
                var channel = connection.CreateModel();
    
                for (int i = 0; i < 10; i++)
                {
                    channel.BasicPublish("exchange1", "queue1", null, Encoding.UTF8.GetBytes(i.ToString()));
                }
    
                channel.Dispose();
    
                Console.WriteLine("发布完毕");
    
    
                Console.ReadKey();
    
            }
        }
    }
    Produce

    ②手动确认

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    
    namespace Consumer
    {
        class Program
        {
            static void Main(string[] args)
            {
                ConnectionFactory factory = new ConnectionFactory()
                {
                    HostName = "localhost",
                    Port = 5672,
                    UserName = "hunter",
                    Password = "hunter"
                };
    
                //创建connection
                var connection = factory.CreateConnection();
    
                //创建chanel
                var channel = connection.CreateModel();
    
                //创建exchange
                channel.ExchangeDeclare("exchange1", ExchangeType.Direct, true, false, null);
    
                //创建queue
                channel.QueueDeclare("queue1", true, false, false, null);
    
                //exchange绑定queue
                channel.QueueBind("queue1", "exchange1", "queue1", null);
    
                var result = channel.BasicGet("queue1", false);
    
                Console.WriteLine(Encoding.UTF8.GetString(result.Body));
    
                //拒绝掉
                //requeue:true:重新放回队列 false:直接丢弃
                channel.BasicReject(result.DeliveryTag, false);
    
                //BasicRecover方法则是进行补发操作,
                //其中的参数如果为true是把消息退回到queue但是有可能被其它的consumer接收到,设置为false是只补发给当前的consumer
                //channel.BasicRecover(true);
    
                Console.ReadKey();
            }
    
        }
    }
    Consumer

    2,发布端的确认

    其中事务的性能消耗最大,confirm其次

    ①confirm机制

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    using RabbitMQ.Client;
    
    namespace Produce
    {
        class Program
        {
            static void Main(string[] args)
            {
                ConnectionFactory factory = new ConnectionFactory()
                {
                    HostName = "localhost",
                    Port = 5672,
                    UserName = "hunter",
                    Password = "hunter"
                };
    
                //创建connection
                var connection = factory.CreateConnection();
    
                //创建chanel
                var channel = connection.CreateModel();
    
                channel.ConfirmSelect();
                for (int i = 0; i < 10000; i++)
                {
                    channel.BasicPublish("exchange1", "queue1", null, Encoding.UTF8.GetBytes(i.ToString()));
                }
                var isallPublish = channel.WaitForConfirms();
                Console.WriteLine(isallPublish);
    
                channel.Dispose();
                connection.Dispose();
    
                Console.WriteLine("发布完毕");
    
                Console.ReadKey();
    
            }
        }
    }
    Produce

    ② 事物机制

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    using RabbitMQ.Client;
    
    namespace Produce
    {
        class Program
        {
            static void Main(string[] args)
            {
                ConnectionFactory factory = new ConnectionFactory()
                {
                    HostName = "localhost",
                    Port = 5672,
                    UserName = "hunter",
                    Password = "hunter"
                };
    
                //创建connection
                var connection = factory.CreateConnection();
    
                //创建chanel
                var channel = connection.CreateModel();
    
                try
                {
                    channel.TxSelect();
                    for (int i = 0; i < 10000; i++)
                    {
                        channel.BasicPublish("exchange1", "queue1", null, Encoding.UTF8.GetBytes(i.ToString()));
                    }
                    channel.TxCommit();
                }
                catch (Exception ex)
                {
                    channel.TxRollback();
                }
    
                channel.Dispose();
                connection.Dispose();
    
                Console.WriteLine("发布完毕");
    
                Console.ReadKey();
    
            }
        }
    }
    Produce

    五、Consumer消费问题

    Consumer消费时,不管你是否却不确认,消息都会一股脑全部打入到你的consumer中去,导致consumer端内存暴涨(EasynetQ的Subscribe不会出现这种情况)

    解决方法:

     ①eventbasicconsumer+QOS

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    using System.Threading;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using EasyNetQ;
    
    namespace Consumer
    {
        class Program
        {
            static void Main(string[] args)
            {
                ConnectionFactory factory = new ConnectionFactory()
                {
                    HostName = "localhost",
                    Port = 5672,
                    UserName = "hunter",
                    Password = "hunter"
                };
    
                //创建connection
                var connection = factory.CreateConnection();
    
                //创建chanel
                var channel = connection.CreateModel();
    
                //创建exchange
                channel.ExchangeDeclare("exchange1", ExchangeType.Direct, true, false, null);
    
                //创建queue
                channel.QueueDeclare("queue1", true, false, false, null);
    
                //exchange绑定queue
                channel.QueueBind("queue1", "exchange1", "queue1", null);
    
    
                //prefetchSize:预取大小   prefetchCount:预取数量
                channel.BasicQos(0, 1, false);
    
                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                consumer.Received += (send, e) =>
                {
                    Console.WriteLine(Encoding.UTF8.GetString(e.Body));
    
                    channel.BasicAck(e.DeliveryTag, false);//确认送达
                    Thread.Sleep(1000000);
                };
    
                //autoAck 设置为true:自动确认
                channel.BasicConsume("queue1", false, consumer);
    
    
                Console.ReadKey();
    
                Console.ReadKey();
            }
    
        }
    }
    Consumer
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    using RabbitMQ.Client;
    
    namespace Produce
    {
        class Program
        {
            static void Main(string[] args)
            {
                ConnectionFactory factory = new ConnectionFactory()
                {
                    HostName = "localhost",
                    Port = 5672,
                    UserName = "hunter",
                    Password = "hunter"
                };
    
                //创建connection
                var connection = factory.CreateConnection();
    
                //创建chanel
                var channel = connection.CreateModel();
    
                try
                {
                    channel.TxSelect();
                    for (int i = 0; i < 10000; i++)
                    {
                        channel.BasicPublish("exchange1", "queue1", null, Encoding.UTF8.GetBytes(i.ToString()));
                    }
                    channel.TxCommit();
                }
                catch (Exception ex)
                {
                    channel.TxRollback();
                }
    
                channel.Dispose();
                connection.Dispose();
    
                Console.WriteLine("发布完毕");
    
                Console.ReadKey();
    
            }
        }
    }
    Produce
  • 相关阅读:
    盒子垂直水平居中
    Sahi (2) —— https/SSL配置(102 Tutorial)
    Sahi (1) —— 快速入门(101 Tutorial)
    组织分析(1)——介绍
    Java Servlet (1) —— Filter过滤请求与响应
    CAS (8) —— Mac下配置CAS到JBoss EAP 6.4(6.x)的Standalone模式(服务端)
    JBoss Wildfly (1) —— 7.2.0.Final编译
    CAS (7) —— Mac下配置CAS 4.x的JPATicketRegistry(服务端)
    CAS (6) —— Nginx代理模式下浏览器访问CAS服务器网络顺序图详解
    CAS (5) —— Nginx代理模式下浏览器访问CAS服务器配置详解
  • 原文地址:https://www.cnblogs.com/zd1994/p/8659643.html
Copyright © 2011-2022 走看看