zoukankan      html  css  js  c++  java
  • rabbitMQ 消息队列

    学习地址

    视频:https://www.bilibili.com/video/av18997807/?p=1

    相关概述

    1、消息队列解决了什么问题?

    异步处理,应用解耦,流量削锋,日志处理

    2、后台管理页面:http://localhost:15672/#/

    如果页面不能访问,则需要在AppData目录下删除两个文件夹-见视频

    账号&密码:guest&guest

     

    3、virtual hosts 相当于mysql的数据库,授权哪些用户可以进入

    =》一般以/开头

     

    4、amqp就是与RabbitMQ通信的协议

     

    简单队列

    1、简单队列的不足?

    1)耦合性高,生产者一一对应消费者(如果我想要多个消费者消费队列中的消息,这时候就不行了)

    2)队列名变更,这时候得同时变更

    工作队列

    1、为什么会出现工作队列?

    1)因为简单队列是一一对应的

    2)我们实际开发时,生产者发送消息是毫不费力的,而消费者一般是要跟业务相结合的,消费者接收到消息之后就需要处理,可能需要花费事件,这时候就会积压了很多信息。

    现象:(设置了两个消费者的响应时间一快一慢)

    消费者1和消费者2处理的消息数量是一样的

    消费者1:偶数

    消费者2:奇数

    这种方式叫做轮询分发(round-robin)

    =》因为rabbitMQ不了解消费者的能力

     

    公平分发

    公平分发的意思就是能者多劳

    使用公平分发,必须关闭自动应答,ack改成手动

    只需要配置channel.basicQos(prefethCount)即可

    消息应答与持久化

    1、消息应答

    boolean autoAck=true;(自动确认模式),一旦rabbitmq将消息分发给消费者,那么就会从内存中删除;(意思是不管消费者是否消费完了该条信息)

    在这种情况下,如果杀死正在执行的消费者,就会丢失正在处理的消息;boolean autoAck=false;(手动模式),如果有一个消费者挂掉,就会交付给其他消费者,rabbitmq支持消息应答,消费者发送一个消息应答,告诉rabbitmq这个消息我已经处理

    成,你可以删了,然后rabbitmq就会删除内存中的消息;

    消息应答默认是打开的,false

    ack:Message acknowledgement

    如果rabbitmq挂了,我们的消息仍然会丢失。所以需要进行持久化

     

    2、持久化处理

    boolean durable=false;

    channel.queueDeclare(QUEUE_NAME,durable,false,false,null);

     

    发布订阅模式

    以上均为一个消息只能被一个消费者消费,所以想要解决这个问题,就得使用发布订阅模式

    发布订阅模型

    -----消息队列-----C1

    P=>X----| 

    -----消息队列-----C2

    解读:

    1)1个生产者,多个消费者

    2)每一个消费者都有自己的队列

    3)生产者没有把消息发送到队列,而是发送到了交换机/转换器(Exchange)

    4)每个队列到需要绑定到同一个交换机上

     

    交换机:

    一方面是接受生产者的消息,一方面是向队列推送消息

    匿名转发 ""

    =》channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());

    fanout(不处理路由键)

    路由模式 diret

    交换机和队列都有一个key进行匹配

    见图

    商品的所有操作路由到A队列,其他所有操作路由到B队列

    如果要实现这个功能,那么就需要将商品所有的key列出来,这样子太麻烦了,所以需要引入topic模式

    =>这里所指的全部列出来,是指消费者去消费的时候那个绑定方法

     

    Topic模式

    * 匹配一个

    # 匹配一个或多个

    这个*,我怎么匹配了多个

    消息确认机制之事务机制

    1、

    recv中不用写自动应答等代码

    在rabbitmq中,我们可以通过持久化数据去解决rabbitmq服务器异常的数据丢失问题;

    问题:生产者将消息发送出去之后,消息到底有没有到达rabbitmq服务器,默认的情况是不知道的;

    两种方式:

    AMQP 实现了事务机制

    Confirm模式

    事务机制

    txSelect 用户将当前channel设置成transation模式

    txCommit

    txRollback

    我发现只要调用了txSelect()方法,其他commit或rollback都没写,也可以保证事务一致性,这条消息照样没发出去

    这种模式每次都得开启事务,提交事务

    所以这种模式是很耗时的,采用这种方式,降低了rabbitmq的消息吞吐量

     

    2、

    confirm单条

    confirm批量

    channel.confirmSelect();//开启confirm模式

     

    confirm异步模式

    Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Channel发出的消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集

    删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构

     

    备注:

    1、rabbitmq-server-3.6.9.exe 激活插件时失败

    rabbitmq-server-3.7.4.exe 安装该版本时提示ERLANG7.0版本过低

    2、如果队列已经定义好了,那么rabbitmq不允许我们重新定义一个已经存在的队列;

    3、这个rabbitmq是自启动的啊

    4、在公平分发的时候,如果把receive1和receive2中的basicQos()这个代码注释掉,会造成消费顺序紊乱

    5、rabbitmq里面只有队列有存储能力

    消费者、生产者均需要声明队列

    6、注意send类里面的发布消息语句

    channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());

    channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());

    一个是发布订阅模式,一个是队列模式

    注意参数别填错了

    7、覆盖某个方法时,形参名称可以改变

    英文单词:

    clustering 集群

    retrieving 检索、恢复

    delivery 传送、传递

    inequivalent 不等价的

    fanout 分发,扇出

     

    相关异常:

    Variable 'channel' is accessed from within inner class, needs to be declared final

    HTTP 400 请求出错 由于语法格式有误,服务器无法理解此请求。不作修改,客户程序就无法重复此请求。

     

     

  • 相关阅读:
    蓝桥杯--算法训练 区间k大数查询
    vijos1782:借教室
    vijos1779国王游戏
    C++大数模板
    HDU1042(N!:设4为基数)
    HDU1026(延时迷宫:BFS+优先队列)
    POJ3984(迷宫问题)
    HDU3018:Ant Trip(欧拉回路)
    HDU5438:Ponds(拓扑排序)
    2008北航:字符串匹配
  • 原文地址:https://www.cnblogs.com/syjp/p/10407002.html
Copyright © 2011-2022 走看看