zoukankan      html  css  js  c++  java
  • rabbitmq 五种消息模型

    具体可参考:https://note.youdao.com/ynoteshare1/index.html?id=db637b43f0ab16cf6db9b9b92d562ca8&type=notebook#/7A55B7E7787A49D0B2E2265D437F3C19;这里写的很具体了;

    一、基础环境:

    1)创建springboot项目, 并导入如下依赖;

            <!--rabbitmq依赖-->
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.4.3</version>
            </dependency>
            <!--springboot mq支持-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>

    2)写一个连接工具类;

    /**
     * 建立连接的工具类,用来简单测试消息发送接收功能
     * 实际上与springboot使用不需要该类
     */
    public class ConnectionUtil {
        /**
         * 建立连接
         * @return
         * @throws IOException
         * @throws TimeoutException
         */
        public static Connection getConnection() throws IOException, TimeoutException {
            ConnectionFactory factory=new ConnectionFactory();
            //mq服务器IP
            factory.setHost("192.168.190.141");
            //铜须端口号
            factory.setPort(5672);
            //虚拟主机
            factory.setVirtualHost("xieqi");
            factory.setUsername("xieqi");
            factory.setPassword("123456");
            return factory.newConnection();
        }
    }

    二、消息模型;

    1)、基本消息模型(basic queues)

    producer---   |队列|   ---consumer

    功能:一个生产者P发送消息到队列Q,一个消费者C接收。实现了基本的消息的生产和消费。一对一。

    生产者:

    public class Producer {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.建立连接
            Connection connection = ConnectionUtil.getConnection();
            //2.建立通道
            Channel channel = connection.createChannel();
            //3.声明队列
            channel.queueDeclare(
                    "simple_queue",//队列名称
                    false,//设置是否持久化
                    false,//设置是否排他(仅申明他的连接可见)
                    false,//是否自动删除
                    null);//参数设置
    
            for (int i = 0; i <10 ; i++) {
                String message=" hello rabbit"+i;
                //通过channel发送消息
                channel.basicPublish(
                        "",//exchange 交换机 ""表示使用默认
                        "simple_queue",// routing_key 路由key
                        null,//设置项
                        message.getBytes());//消息
                System.out.println("消息发送成功:"+message);
            }
            //关闭通道和连接
            channel.close();
            connection.close();
        }
    }

    自动确认消费者:

    /**
     * 描述:
     *   消费消息,自动确认(ACK)
     * @author bigpeng
     * @create 2019-07-15 13:40
     */
    public class ConsumerAutoACK {
        private final static String QUEUE_NAME = "simple_queue";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 创建通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [x] received : " + msg + "!");
                }
            };
            // 监听队列,第二个参数:是否自动进行消息确认。
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }

    手动确认消费者:

    public class Consumer {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.获取连接
            Connection connection = ConnectionUtil.getConnection();
            //2.创建通道
            Channel channel = connection.createChannel();
            //3.声明队列
            channel.queueDeclare("simple_queue",
                    false,false,false,null);
            //4 定义队列的消费者
            DefaultConsumer consumer=new DefaultConsumer(channel){
                //处理消息,当监听到队列中有消息时,会触发该方法
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    String message = new String(body);
                    System.out.println("获取到队列simple_queue的消息:"
                                   +message);
                    Random random=new Random();
                    if(random.nextInt(10)%2==1) {
                        //手动ACK
                        //成功ACK
                       //同一个会话, consumerTag 是固定的 可以做此会话的名字, deliveryTag 每次接收消息+1,可以做此消息处理通道的名字。  
                       channel.basicAck(envelope.getDeliveryTag(), false);
                        System.out.println("消费成功");
                    }else {
                        //失败的ACK
                        channel.basicNack(envelope.getDeliveryTag(),
                                false,
                                true);//是否重回队列
                        System.out.println("消费失败");
                    }
                }
            };
            //将consumer关联到通道
    
            //自动ACK
    //        channel.basicConsume(
    //                "simple_queue",//队列名
    //                true,//是否自动消息确认(ACK)
    //                consumer);//Consumer对象
    
               //手动ACK
                    channel.basicConsume(
                    "simple_queue",//队列名
                    false,//是否自动消息确认(ACK)
                    consumer);//Consumer对象
    
        }
    }

    2)、工作队列(work queues)

    功能:一个生产者,多个消费者。写法与基本消息模型类似,只不过原来是一个消费者,现在是多个消费者。多个消费者处理队列中的数据。

    特点:

    1)可以有多个消费者;

    2)一条消息只能被多个消费者中的一个消费

    3)、发布/订阅模式 Publish/Subscribe

    功能:一个生产者发送的消息会被多个消费者获取。一个生产者、一个交换机、多个队列、多个消费者

    与工作队列区别:

    1)工作队列只有一个队列,而发布订阅有多个队列

    2)工作队列一个消息只能被多个消费者中的一个消费,而发布订阅一个消息会被多个订阅的消费者消费。

    3)发布订阅比工作队列多出一个交换机概念,用来绑定消息发送到哪些消费者。 其实之前的两种模式也需要交换机,其使用默认交换,我们通过空字符串(“”)来识别。

    4)、路由模式(Routing)

    功能:生产者发送消息到交换机并且要指定路由key,消费者将队列绑定到交换机时需要指定路由key。只有当两个key相匹配时,消息才会发送到对应的消费者队列。即在广播的基础上有了路由的功能。 type 指定为direct。

    5)、主题订阅模式(topic)

    功能:生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配; 符号#:匹配一个或者多个词 lazy.# 可以匹配 lazy.irs或者lazy.irs.cor; 符号*:只能匹配一个词 lazy.* 可以匹配 lazy.irs或者lazy.cor

  • 相关阅读:
    LeetCode 623. Add One Row to Tree
    LeetCode 894. All Possible Full Binary Trees
    LeetCode 988. Smallest String Starting From Leaf
    LeetCode 979. Distribute Coins in Binary Tree
    LeetCode 814. Binary Tree Pruning
    LeetCode 951. Flip Equivalent Binary Trees
    LeetCode 426. Convert Binary Search Tree to Sorted Doubly Linked List
    LeetCode 889. Construct Binary Tree from Preorder and Postorder Traversal
    LeetCode 687. Longest Univalue Path
    LeetCode 428. Serialize and Deserialize N-ary Tree
  • 原文地址:https://www.cnblogs.com/xie-qi/p/13349495.html
Copyright © 2011-2022 走看看