1.什么是高并发系统的消息队列?
队列:对头+队列容器+对尾
消息队列(MessageQueue MQ):有一个队列容器,生产者(入队列),消费者(出队列)
2.使用消息队列的场景(使用消息队列的原因):
2.1 分布式场景
2.1.1 异步处理
多线程可以节省时间,但是会降低cpu吞吐量
用消息队列也可以减少时间的消耗,当创建失败的时候,可以用消息队列的持久化机制处理
2.1.2 应用解耦
利用消息队列将客户都按和库存系统解耦(消除强依赖)
2.1.3 流量削峰
限制流量(接受客户端请求服务将客户的请求保存到消息队列中,业务服务用自己的最大的吞吐量去消息队列中获取客户请求,如果不能正常取到的,通过消息队列返回给客户端)
2.1.4 异步事务
2.2 日志场景
2.2.1 优化日志传输,提升系统性能
2.3 即时通讯
2.3.1 聊天室
3. 实现消息队列的发手段:
3.1 RabbitMQ
3.2 ActiveMQ
3.3 RocketMQ
3.4 Kafka
3.5 Redis
3.6 ZeroMQ
3.7 阿里MNS
4.应用Redis的消息队列
4.1 redis应用环境:
4.1.1 redis
4.1.2 service.stock(下载serviceStock.redis.dll)
创建客户端:RedisClient client = new RedisClient("localhost:6379");
4.1.3 redis生产者(可以存放2的32次方-1的数据大约是40亿)
//Lpush()从左边插入数据
//Rpush()从右边插入数据
byte[] bytes = System.Text.Encoding.UTF8.GetBytes("tony");
client.Lpush(“tonytest”, bytes );
4.1.4 redis消费者
//队列是左边插入数据,右边取数据;右边插入数据,左边取数据
Byte[] bytes = redisClient.RPop("tonytest");
string value = System.Text.Encoding.UTF8.GetString(bytes);
4.1.5 消息队列的两种模式
4.1.5.1 推模式
这是指消费者在获取消息队列中的数据的时候,当数据没有的时候,会按照制定的时长阻塞线程,在阻塞的过程中如果消息队列中有数据的话会自动推送到消费者。
被动的接收数据
//BRPop()
//BRLop()
//Byte[] bytes = redisClient.BRPopValue("tonytest", 60);//阻塞60s
while(true)
{
Byte[] bytes = redisClient.BRPopValue("tonytest", 10);
if(bytes != null)
{
string value = System.Text.Encoding.UTF8.GetString(bytes);
Console.WriteLine(value);
}
else
{
Console.WriteLine("队列中没有数据");
}
}
4.1.5.2 拉模式
主动的去消息队列中获取数据
//RPOP()
//LPop()
//Byte[] bytes = redisClient.RPop("tonytest");
while(true)
{
Thread.sleep(1000);
Byte[] bytes = redisClient.RPop("tonytest");
if(bytes != null)
{
string value = System.Text.Encoding.UTF8.GetString(bytes);
Console.WriteLine(value);
}
else
{
Console.WriteLine("队列中没有数据");
}
}
总结:消息队列的生产者和消费者是一对一的
4.1.6 redis关闭
redisClient.Dispose()
4.1.6 准备异步处理,应用解
情景:异步实现创建订单,添加积分,发送短信
4.1.6.1在创建订单的服务中
//1.创建订单
//2.创建连接
RedisClient client = new RedisClient("localhost:6379");
//3.生产消息
//3.1积分消息
byte[] bytes = System.Text.Encoding.UTF8.GetBytes("订单号");
client.Lpush(“order_points”, bytes );
//3.2短信消息
byte[] bytes = System.Text.Encoding.UTF8.GetBytes("订单号");
client.Lpush(“order_sms”, bytes );
4.1.6.2在积分服务中:
//1.创建连接
RedisClient client = new RedisClient("localhost:6379");
while(true)
{
Byte[] bytes = redisClient.RPop("order_points");
if(bytes != null)
{
string value = System.Text.Encoding.UTF8.GetString(bytes);
Console.WriteLine($"获取到的积分消息:{value}");
//增加积分
OrderPoint p = new OrderPoint();
p.AddPoint(value);
Console.WriteLine(value);
}
else
{
Console.WriteLine("队列中没有数据");
}
}
4.1.6.3 在短信服务中消费短信消息
//1.创建连接
RedisClient client = new RedisClient("localhost:6379");
while(true)
{
Byte[] bytes = redisClient.RPop("order_sms");
if(bytes != null)
{
string value = System.Text.Encoding.UTF8.GetString(bytes);
Console.WriteLine($"获取到的积分消息:{value}");
//消费短信消息,发送短信
OrderSms p = new OrderSms();
p.SendSms(value);
Console.WriteLine(value);
}
else
{
Console.WriteLine("队列中没有数据");
}
}