zoukankan      html  css  js  c++  java
  • rabbitMQ笔记

    六种工作模式

    官网介绍:https://www.rabbitmq.com/getstarted.html

    简单模式:一个生产者,一个消费者
    
    work模式:一个生产者,多个消费者,每个消费者获取到的消息唯一。
    
    订阅模式:一个生产者发送的消息会被多个消费者获取。
    
    路由模式:发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key
    
    topic模式:将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,“#”匹配一个词或多个词,“*”只匹配一个词。

    简单模式/work模式

                Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false);
                
                channel.queueDeclare("yangsimple", false, false, false, null);
                channel.basicPublish("", "yangsimple", null, "simple message".getBytes());
                System.out.println("basic publish");
                channel.close();

    订阅模式/路由模式

    //生产者
    
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    
    //此处不需要单独queueDeclare操作,直接向Exchange发送消息
    
    channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);
    
    String message = "message";
    
    channel.basicPublish(EXCHANGE_NAME,"key2", null, message.getBytes());
    System.out.println("[x] Sent '"+message+"'");
    
    channel.close();
    connection.close();
    
    
    
    //////###########
    
    //消费者
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    
    channel.queueDeclare(QUEUE_NAME, false,false,false,null);
    
    //此处需要定义queue以确定向订阅哪一个queue
    
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"key");
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"key2");
    
    channel.basicQos(1);
    
    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(QUEUE_NAME, false, consumer);
    
    while(true){
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println("[x] Received1 "+message);
    Thread.sleep(10);
    
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }

     AMQP:

    注:

    这里的Connection是个长连接,消息的传递都是在Channel中执行的。Channel是个虚拟链接,使用Channel大大的减少创建TCP(Connection)的资源消耗。

    消息消费两种模式:

    一种是 Push 模式,只要生产者发到服务器,就马上推 送给消费者。另一种是 Pull 模式,消息存放在服务端,只有消费者主动获取才能拿到消息。 

    Exchange解释:

    交换机是一个绑定列表,用来查找匹配的绑定关系。
    
    在 RabbitMQ 里面永远不会出现消息直接发送到队列的情况,默认交换机是”“。因为在 AMQP 里面 引入了交换机(Exchange)的概念,用来实现消息的灵活路由。
    
    队列使用绑定键(Binding Key)跟交换机建立绑定关系。 生产者发送的消息需要携带路由键(Routing Key),交换机收到消息时会根据它保存的绑定列表,决定将消息路由到哪些与它绑定的队列上。 
    注意:交换机与队列、队列与消费者都是多对多的关系。

    Queue扩展参数:

    含义

    x-message-ttl

    队列中消息的存活时间,单位毫秒
    

    x-expires

    队列在多久没有消费者访问以后会被删除
    

    x-max-length

    队列的最大消息数

    x-max-length-bytes

    队列的最大容量,单位 Byte

    x-dead-letter-exchange

    队列的死信交换机

    x-dead-letter-routing-key

    死信交换机的路由键

    x-max-priority

    队列中消息的最大优先级,消息的优先级不能超过它
    

    虚拟主机 VHOST

    VHOST 除了可以提高硬件资源的利用率之外,还可以实现资源的隔离和权限的控制。
    它的作用类似于编程语言中的 namespace 和 package,不同的 VHOST 中可以有 同名的 Exchange 和 Queue,它们是完全透明的。

     死信队列

    条件:

    1. 消息被消费者拒绝并且未设置重回队列:(NACK || Reject ) && requeue == false
    2. 消息过期
    3. 队列达到最大长度,超过了 Max length(消息数)或者 Max length bytes (字节数),最先入队的消息会被发送到 DLX。

    声明死信队列代码:

    //首先声明 死信队列Exchange及Queue
    
    @Bean("deatLetterExchange")
     public TopicExchange deadLetterExchange() {
    return new TopicExchange("DEAD_LETTER_EXCHANGE", true, false, new HashMap<>());
    } @Bean(
    "deatLetterQueue") public Queue deadLetterQueue() { return new Queue("DEAD_LETTER_QUEUE", true, false, false, new HashMap<>());
    }
    //此处Exchange和queue进行绑定 @Bean public Binding bindingDead(@Qualifier("deatLetterQueue") Queue queue,@Qualifier("deatLetterExchange") TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("#"); // 无条件路由
    }
    ///声明正常队列时,将死信队列信息添加到arguments中 @Bean("oriUseExchange") public DirectExchange exchange() { return new DirectExchange("_USE_EXCHANGE", true, false, new HashMap<>());
    } @Bean(
    "oriUseQueue") public Queue queue() { Map<String, Object> map = new HashMap<String, Object>(); map.put("x-message-ttl", 10000); // 10 秒钟后成为死信 map.put("x-dead-letter-exchange", "DEAD_LETTER_EXCHANGE"); // 队列中的消息变成死信后,进入死信交换机 return new Queue("_USE_QUEUE", true, false, false, map);
    } @Bean
    public Binding binding(@Qualifier("oriUseQueue") Queue queue,@Qualifier("oriUseExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("ori.use");
    }

     消费端限流:

    在消费者处理消息的能力有限,例如消费者数量太少,或者单条消息的处理时间过 长的情况下,如果我们希望在一定数量的消息消费完之前,不再推送消息过来,就要用 到消费端的流量限制措施。

    可以基于 Consumer 或者 channel 设置 prefetch count 的值,含义为 Consumer端的最大的 unacked messages 数目。当超过这个数值的消息未被确认,RabbitMQ 会 停止投递新的消息给该消费者。

    // 如果超过 2 条消息没有发送 ACK,当前消费者不再接受队列消息
    channel.basicQos(2); 
    channel.basicConsume(QUEUE_NAME, false, consumer);
    
    ///SimpleMessageListenerContainer
    container.setPrefetchCount(2);

     


    多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

    Channel: 信道是创建在“真实”TCP上的虚拟连接,AMQP命令都是通过信道发送出去的,每个信道都会有一个唯一的ID,不论是发布消息,订阅队列或者介绍消息都是通过信道完成的。

    为什么不通过TCP直接发送命令?

    对于操作系统来说创建和销毁TCP会话是非常昂贵的开销,假设高峰期每秒有成千上万条连接,每个连接都要创建一条TCP会话,这就造成了TCP连接的巨大浪费,而且操作系统每秒能创建的TCP也是有限的,因此很快就会遇到系统瓶颈。

    如果我们每个请求都使用一条TCP连接,既满足了性能的需要,又能确保每个连接的私密性,这就是引入信道概念的原因。

    RabbitMQ主要组件:

    ConnectionFactory(连接管理器)、Channel(信道)、Exchange(交换器)、Queue(队列)、RoutingKey(路由键)、BindingKey(绑定键)。

    RoutingKey(路由键):用于把生成者的数据分配到交换器上;

    BindingKey(绑定键):用于把交换器的消息绑定到队列上

    消息持久化的缺点: 性能大幅度降低:

    消息持久化的优点显而易见,但缺点也很明显,那就是性能,因为要写入硬盘要比写入内存性能较低很多,从而降低了服务器的吞吐量,尽管使用SSD硬盘可以使事情得到缓解,但他仍然吸干了Rabbit的性能,当消息成千上万条要写入磁盘的时候,性能是很低的。
    
    所以使用者要根据自己的情况,选择适合自己的方式。

     虚拟主机: vhost

    每个Rabbit都能创建很多vhost,我们称之为虚拟主机,每个虚拟主机其实都是mini版的RabbitMQ,拥有自己的队列,交换器和绑定,拥有自己的权限机制

    1. RabbitMQ默认的vhost是“/”开箱即用;
    
    2. 多个vhost是隔离的,多个vhost无法通讯,并且不用担心命名冲突(队列和交换器和绑定),实现了多层分离;
    
    3. 创建用户的时候必须指定vhost;

    fanout交换器——发布/订阅模式

    fanout有别于direct交换器,fanout是一种发布/订阅模式的交换器,当你发送一条消息的时候,交换器会把消息广播到所有附加到这个交换器的队列上。

    对于fanout交换器来说routingKey(路由键)是无效的,这个参数是被忽略的。

    final String ExchangeName = "fanoutec"; // 交换器名称
    Connection conn = connectionFactoryUtil.GetRabbitConnection();
    Channel channel = conn.createChannel();
    channel.exchangeDeclare(ExchangeName, "fanout"); // 声明fanout交换器

    具体参照: https://www.cnblogs.com/vipstone/p/9295625.html

    topic交换器——匹配订阅模式

    topic路由器的关键在于定义路由键,定义routingKey名称不能超过255字节,使用“.”作为分隔符,消费消息的时候routingKey可以使用下面字符匹配消息:

    • "*"匹配一个分段(用“.”分割)的内容;
    • "#"匹配0和多个字符;

    生产者可靠性发布的实现方式:通过AMQP提供的事务机制实现, 使用发送者确认模式实现


    消息可靠性发送

    消息的发送流程主要如下

     针对第一点生产者发送消息到 Broker,在 RabbitMQ 里面提供了两种机制服务端确认机制

    第一种是 Transaction(事务)模式,第二种 Confirm(确认)模式。

    事务使用

    事务的实现主要是对信道(Channel)的设置,主要的方法有三个:

    1. channel.txSelect()声明启动事务模式;

    2. channel.txComment()提交事务;

    3. channel.txRollback()回滚事务;

    Demo:

        channel.txSelect(); // 声明事务
        // 发送消息
        channel.basicPublish("", _queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
        channel.txCommit(); // 提交事务

    事务的缺点是:只有收到了服务端的 Commit-OK 的指令,才能提交成功效率太差.

    Confirm发送方确认模式

    Confirm发送方确认模式使用和事务类似,也是通过设置Channel进行发送方确认的。

    Confirm的三种实现方式:

    方式一:channel.waitForConfirms()普通发送方确认模式(发送一条确认一条);

    方式二:channel.waitForConfirmsOrDie()批量确认模式(难点:无法确认每次发送多少条合适);

    方式三:channel.addConfirmListener()异步监听发送方确认模式;

    Demo:

    // 开启发送方确认模式
    channel.confirmSelect();
    String message = String.format("时间 => %s", new Date().getTime());
    channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
    if (channel.waitForConfirms()) {
        System.out.println("消息发送成功" );
    }
    
    
    ///////////////
    // 开启发送方确认模式
    channel.confirmSelect();
    for (int i = 0; i < 10; i++) {
        String message = String.format("时间 => %s", new Date().getTime());
        channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
    }
    channel.waitForConfirmsOrDie(); //直到所有信息都发布,只要有一个未确认就会IOException
    System.out.println("全部执行完成");
    
    //////////
    // 开启发送方确认模式
    channel.confirmSelect();
    for (int i = 0; i < 10; i++) {
        String message = String.format("时间 => %s", new Date().getTime());
        channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
    }
    //异步监听确认和未确认的消息
    channel.addConfirmListener(new ConfirmListener() {
        @Override
        public void handleNack(long deliveryTag, boolean multiple) throws IOException {
            System.out.println("未确认消息,标识:" + deliveryTag);
        }
        @Override
        public void handleAck(long deliveryTag, boolean multiple) throws IOException {
            System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
        }
    });

    生产者可靠性发布小结:

    Confirm批量确定和Confirm异步模式性能相差不大,Confirm模式要比事务快10倍左右。 


    RabbitMQ持久化方案,包含Exchange,Queue,Message三种设置

    @Bean("Queue") public Queue GpQueue() {
    // queueName, durable, exclusive, autoDelete, Properties
    return new Queue("Test_queuename", true, false, false, new HashMap<>()); }
    
    @Bean("Exchange")
    public DirectExchange exchange() {
    // exchangeName, durable, exclusive, autoDelete, Properties
    return new DirectExchange("Name_TEST_EXCHANGE", true, false, new HashMap<>()); }
    
    //Message 
    MessageProperties messageProperties = new MessageProperties(); messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); 
    Message message = new Message("持久化消息".getBytes(), messageProperties); rabbitTemplate.send("GP_TEST_EXCHANGE", "test", message);

    消费者消费数据高可用性

    1. 调用生产者API进行回调

    2.发送响应消息给生产者


    MQ集群设置

    因为 Erlang 天生具备分布式的特性, 所以 RabbitMQ 天然支持集群,不需要通过引入 ZK 或者数据库来实现数据同步

    MQ集群分两种:

    第一种普通集群,各个机器间只共享元数据,

    第二种:镜像队列

    镜像队列模式下,消息内容会在镜像节点间同步,可用性更高。不过也有一定的副 作用,系统性能会降低,节点过多的情况下同步的代价比较大。


  • 相关阅读:
    sql性能调优的注意项
    mybatis获取刚插入数据的ID
    mysql
    JQuery
    JS
    css
    web前端
    python爬虫
    socket编程
    python基础
  • 原文地址:https://www.cnblogs.com/snow-man/p/11653926.html
Copyright © 2011-2022 走看看