zoukankan      html  css  js  c++  java
  • RabbitMQ实战

     

     

    一、Hello World

    1.amqp-client客户端依赖

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.0.0</version>
    </dependency>

    2.Rabbitmq类与方法

    • ConnectionFactory:设置port端口(5672)、host主机、用户名密码、virtual host虚拟主机信息
    • Channel: 通道
    • queueDeclare:声明队列,参数依次为:queueName队列名、durable持久化、exclusive排他队列、autoDelete自动删除、arguments(Map)其他参数配置
    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments) throws IOException;
    • basicPublish:发布消息,参数依次为:exchange交换机名、queueName队列名、props其他参数、body消息(Byte[])
    void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
    • DefaultConsumer:消息消费者,通过handleDelivery方法处理消息,使用时需重写该方法
    • handleDelivery:接收处理消息,参数依次为:consumerTag客户标记、envelope包含amqp参数、props其他参数、body消息(Byte[])
    @Override
    public void handleDelivery(String consumerTag,
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body)
        throws IOException
    {
            // no work to do
    }
    • basicConsumer:监听队列,参数依次为:queue队列名、autoAck自动应答、callback回调
    String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

    3.Sender生产者

    package com.lyf.springboot02.mq;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author lyf
     * @date 2019/7/28 18:12
     */
    public class Sender {
        public static final String QUEUE_NAME = "simple";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            /**
             * 1. 获取连接connection
             * 2. 声明消息队列queue name
             * 3. 发送消息message
             * 4. 关闭连接connection
             */
            // 定义连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setPort(5672);
            factory.setHost("192.168.37.200");
            factory.setUsername("lyf");
            factory.setPassword("123456");
            factory.setVirtualHost("/lyf");
                    
            // 创建连接
            Connection connection = factory.newConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            /*  声明一个 Queue
                第一个参数叫 queue 名称
                第二个参数 durable:是否持久化
                第三个参数: Exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅
                对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:
                其一,排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个
                连接创建的排他队列的。
                其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建
                立同名的排他队列的,这个与普通队列不同。
                其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会
                被自动删除的。
                这种队列适用于只限于一个客户端发送读取消息的应用场景。
                第四个参数:Auto-delete:自动删除,如果该队列没有任何订阅的消费者的话,该
                队列会被自动删除。这种队列适用于临时队列。
            */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 发送消息
            String msg = "hello mq";
            // 发送消息到 mq 队列中
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes("utf-8"));
            // 关闭通道
            channel.close();
            // 关闭连接
            connection.close();
        }
    }

    4.Receiver消费者

    package com.lyf.springboot02.mq;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Receiver {
        public static final String QUEUE_NAME = "simple";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setPort(5672);
            factory.setHost("192.168.37.200");
            factory.setUsername("lyf");
            factory.setPassword("123456");
            factory.setVirtualHost("/lyf");
            // 创建连接对象
            Connection connection = factory.newConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 指定队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 获取消息
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 获取消息并在控制台打印
                    String msg = new String(body, "utf-8");
                    System.out.println("get msg:" + msg);
                }
            };
            // 监听队列
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }

    二、交换机类型 Exchange Type

    • Fanout 广播模式
    • Direct 路由模式
    • Topic 主题模式
    • Header
    • RPC

    1.消息轮询分发(Round Robin)

     


     

    默认方式,队列中的消息依次、平均分配给每个消费者。
    注意:消费者中autoAck设为true,收到消息自动回执。
    缺点:不同机子性能不一样,处理速度不一样。会造成慢的太累,快的太闲。
    通过for循环生成多条消息,启动两个客户端,结果大致如下:

     

    生产者:1 2 3 4 5 6 7 8 9 10
    消费者1:1 3 5 7 9
    消费者2:2 4 6 8 10

    2.消息公平分发(Fair Dispatch)

     


     

    机子性能好就多发,性能差就少发。
    注意:

     

    • 消费者通道basicQos的prefetchCount设为1,每次只接收一条消息
    • autoAck设为false,手动回执
    • 回执消息basicAck,参数依次为:DeliveryTag消息通道名字、multiple批量

    不足:每个消息只能被一个消费者消费
    Receiver代码修改如下,通过线程睡眠模拟处理消息速度

    channel.basicQos(1);
    
    // 获取消息
    Consumer consumer=new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                                   AMQP.BasicProperties properties, byte[] body) throws IOException {
            String msg=new String(body,"utf-8");
            System.out.println("1 号消费者消费消息:msg-->"+msg);
            try {
                Thread.sleep(250);
                channel.basicAck(envelope.getDeliveryTag(), false);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };
    
    channel.basicConsume(QUEUE_NAME, false, consumer);

    3.发布与订阅模式(Publish/Subscribe)

     


     

    Fanout:生产者发送消息到交换机,交换机投递到多个队列,每个队列由各自的消费者消费
    注意:

     

    • 生产者不再声明队列,声明交换机exchangeDeclare,参数依次为:exchange交换机名、type交换机类型
    Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type) throws IOException;
    • 发布消息时,指定交换机名、路由key为空
    • 消费者绑定队列到交换机queueBind,参数依次为:queue队列名、exchange交换机名、routingKey路由键
    Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;

    缺点:不能区分消费者

    Sender修改部分:

    // 定义交换机名
    final String EXCHANGE_NAME="hello-fanout";
    // 声明交换机
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
    // 发布消息
    String msg = "Hello";
    channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes("utf-8"));

    Receiver修改如下:

    // 定义交换机名
    final String EXCHANGE_NAME="hello-fanout";
    // 声明交换机
    channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT);
    // 创建默认排他队列
    String queueName = channel.queueDeclare().getQueue();
    // 绑定队列到路由
    channel.queueBind(queueName,EXCHANGE_NAME,"");

    4.路由模式(Routing)

     


     

    Direct:通过指定路由key发送到不同队列,区分不同消费者
    缺点:路由规则不灵活
    Sender修改部分:

     

    // 发布消息
    String msg = "Hello-info";
    // 设置路由key
    String routingKey = "info";
    // 绑定队列到路由
    channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes("utf-8"));

    Receiver修改如下:

    // 设置路由key
    String routingKey = "info";
    // 绑定队列到路由
    channel.queueBind(queueName,EXCHANGE_NAME,routingKey);

    5.主题模式(Topics)

     


     

    路由规则:

     

    • . 通过点分割一个单词
    • * 模糊匹配一个单词
    • # 模糊匹配多个单词,可以没有

    以上图中的配置为例:

    routingKey=”quick.orange.rabbit”的消息会同时路由到Q1与Q2,
    routingKey=”lazy.orange.fox”的消息会路由到Q1,Q2,
    routingKey=”lazy.brown.fox”的消息会路由到Q2,
    routingKey=”lazy.pink.rabbit”的消息会路由到Q2;
    routingKey=”quick.brown.fox”、
    routingKey=”orange”、
    routingKey=”quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何bindingKey。

    // 声明交换机
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

    三、消息事务机制

    如何保证生产者的消息成功投递到队列,两种方式:transition事务模式confirm确认模式,两种模式不能同时开启。

    1.transition事务模式

    类似JDBC操作,发送消息前开启事务,成功后提交事务,失败则回滚事务;
    缺点:效率低

    • channel.txSelect() 开启事务
    • channel.txCommit() 提交事务
    • channel.txRollback() 回滚事务

    2.confirm确认模式

    实现生产者confirm 机制有三种方式:

    1. 普通confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。实际上是一种串行confirm了。
    2. 批量confirm模式:每发送一批消息后,调用waitForConfirmsOrDie()方法,等待服务器端confirm。
    3. 异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方法。
    • channel.confirmSelect() 开启确认模式
    • channel.waitForConfirms() 等待消息被确认
    • addConfirmListener() 添加异步监听器,重写handleAck消息成功和handleNack消息失败方法

    1) 普通confirm模式

    // 开启确认模式
    channel.confirmSelect();
    ...
    // 等待确认
    if(channel.waitForConfirms()){
        System.out.println("发送成功。。。");
    }

    2) 批量confirm模式

    // 开启确认模式
    channel.confirmSelect();
    ...
    // 等待确认
    channel.waitForConfirmsOrDie();
    System.out.println("发送成功。。。");

    3) 异步confirm模式

    // 定义集合存放seqNO
    final SortedSet<Long> confirmSet = Collections.synchronizedNavigableSet(new TreeSet<Long>());
    ...
    // 开启确认模式
    channel.confirmSelect();
    
    // 绑定监听器
    channel.addConfirmListener(new ConfirmListener() {
        @Override
        public void handleAck(long deliveryTag, boolean multiple) throws IOException {
            if(multiple){
                System.out.println("mutil-succe: "+deliveryTag);
                confirmSet.headSet(deliveryTag+1L).clear();
            }else{
                System.out.println("single-succe: "+deliveryTag);
                confirmSet.remove(deliveryTag);
            }
        }
    
        @Override
        public void handleNack(long deliveryTag, boolean multiple) throws IOException {
            if(multiple){
                System.out.println("mutil-fail: "+deliveryTag);
                confirmSet.headSet(deliveryTag+1L).clear();
            }else{
                System.out.println("single-fail: "+deliveryTag);
                confirmSet.remove(deliveryTag);
            }
        }
    });
    ...
    // 获取编号
    long seqNo = channel.getNextPublishSeqNo();
    channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes("utf-8"));
    System.out.println("Send: "+msg);
    //添加编号
    confirmSet.add(seqNo);
  • 相关阅读:
    AFNetworking使用总结
    使用Attiny 85开发板制作BadUSB
    C# 按指定数量从前面或者后面删除字符串
    C# 获取打印机列表
    【解决】该任务映像已损坏或已篡改。(异常来自HRESULT:0x80041321)
    PowerShell 解锁使用浏览器下载的文件
    C#使用HttpHelper万能框架,重启路由器
    【解决】应用程序无法正常启动(0xc000007b)。请单击“确定”关闭应用程序。
    Windows 7 IE11 F12 不能正常使用
    HTML5 图片上传预览
  • 原文地址:https://www.cnblogs.com/linyufeng/p/11297769.html
Copyright © 2011-2022 走看看