zoukankan      html  css  js  c++  java
  • rabbitmq 五种消息模型

    具体可参考:https://note.youdao.com/ynoteshare1/index.html?id=db637b43f0ab16cf6db9b9b92d562ca8&type=notebook#/7A55B7E7787A49D0B2E2265D437F3C19;这里写的很具体了;

    一、基础环境:

    1)创建springboot项目, 并导入如下依赖;

            <!--rabbitmq依赖-->
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.4.3</version>
            </dependency>
            <!--springboot mq支持-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>

    2)写一个连接工具类;

    /**
     * 建立连接的工具类,用来简单测试消息发送接收功能
     * 实际上与springboot使用不需要该类
     */
    public class ConnectionUtil {
        /**
         * 建立连接
         * @return
         * @throws IOException
         * @throws TimeoutException
         */
        public static Connection getConnection() throws IOException, TimeoutException {
            ConnectionFactory factory=new ConnectionFactory();
            //mq服务器IP
            factory.setHost("192.168.190.141");
            //铜须端口号
            factory.setPort(5672);
            //虚拟主机
            factory.setVirtualHost("xieqi");
            factory.setUsername("xieqi");
            factory.setPassword("123456");
            return factory.newConnection();
        }
    }

    二、消息模型;

    1)、基本消息模型(basic queues)

    producer---   |队列|   ---consumer

    功能:一个生产者P发送消息到队列Q,一个消费者C接收。实现了基本的消息的生产和消费。一对一。

    生产者:

    public class Producer {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.建立连接
            Connection connection = ConnectionUtil.getConnection();
            //2.建立通道
            Channel channel = connection.createChannel();
            //3.声明队列
            channel.queueDeclare(
                    "simple_queue",//队列名称
                    false,//设置是否持久化
                    false,//设置是否排他(仅申明他的连接可见)
                    false,//是否自动删除
                    null);//参数设置
    
            for (int i = 0; i <10 ; i++) {
                String message=" hello rabbit"+i;
                //通过channel发送消息
                channel.basicPublish(
                        "",//exchange 交换机 ""表示使用默认
                        "simple_queue",// routing_key 路由key
                        null,//设置项
                        message.getBytes());//消息
                System.out.println("消息发送成功:"+message);
            }
            //关闭通道和连接
            channel.close();
            connection.close();
        }
    }

    自动确认消费者:

    /**
     * 描述:
     *   消费消息,自动确认(ACK)
     * @author bigpeng
     * @create 2019-07-15 13:40
     */
    public class ConsumerAutoACK {
        private final static String QUEUE_NAME = "simple_queue";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 创建通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [x] received : " + msg + "!");
                }
            };
            // 监听队列,第二个参数:是否自动进行消息确认。
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }

    手动确认消费者:

    public class Consumer {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.获取连接
            Connection connection = ConnectionUtil.getConnection();
            //2.创建通道
            Channel channel = connection.createChannel();
            //3.声明队列
            channel.queueDeclare("simple_queue",
                    false,false,false,null);
            //4 定义队列的消费者
            DefaultConsumer consumer=new DefaultConsumer(channel){
                //处理消息,当监听到队列中有消息时,会触发该方法
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    String message = new String(body);
                    System.out.println("获取到队列simple_queue的消息:"
                                   +message);
                    Random random=new Random();
                    if(random.nextInt(10)%2==1) {
                        //手动ACK
                        //成功ACK
                       //同一个会话, consumerTag 是固定的 可以做此会话的名字, deliveryTag 每次接收消息+1,可以做此消息处理通道的名字。  
                       channel.basicAck(envelope.getDeliveryTag(), false);
                        System.out.println("消费成功");
                    }else {
                        //失败的ACK
                        channel.basicNack(envelope.getDeliveryTag(),
                                false,
                                true);//是否重回队列
                        System.out.println("消费失败");
                    }
                }
            };
            //将consumer关联到通道
    
            //自动ACK
    //        channel.basicConsume(
    //                "simple_queue",//队列名
    //                true,//是否自动消息确认(ACK)
    //                consumer);//Consumer对象
    
               //手动ACK
                    channel.basicConsume(
                    "simple_queue",//队列名
                    false,//是否自动消息确认(ACK)
                    consumer);//Consumer对象
    
        }
    }

    2)、工作队列(work queues)

    功能:一个生产者,多个消费者。写法与基本消息模型类似,只不过原来是一个消费者,现在是多个消费者。多个消费者处理队列中的数据。

    特点:

    1)可以有多个消费者;

    2)一条消息只能被多个消费者中的一个消费

    3)、发布/订阅模式 Publish/Subscribe

    功能:一个生产者发送的消息会被多个消费者获取。一个生产者、一个交换机、多个队列、多个消费者

    与工作队列区别:

    1)工作队列只有一个队列,而发布订阅有多个队列

    2)工作队列一个消息只能被多个消费者中的一个消费,而发布订阅一个消息会被多个订阅的消费者消费。

    3)发布订阅比工作队列多出一个交换机概念,用来绑定消息发送到哪些消费者。 其实之前的两种模式也需要交换机,其使用默认交换,我们通过空字符串(“”)来识别。

    4)、路由模式(Routing)

    功能:生产者发送消息到交换机并且要指定路由key,消费者将队列绑定到交换机时需要指定路由key。只有当两个key相匹配时,消息才会发送到对应的消费者队列。即在广播的基础上有了路由的功能。 type 指定为direct。

    5)、主题订阅模式(topic)

    功能:生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配; 符号#:匹配一个或者多个词 lazy.# 可以匹配 lazy.irs或者lazy.irs.cor; 符号*:只能匹配一个词 lazy.* 可以匹配 lazy.irs或者lazy.cor

  • 相关阅读:
    swoole 入门
    Centos7安装Percona5.7
    clone github报Permission denied (publickey) 解决方案
    yii2-swiftmailer入门
    Yii 2.0 数据库操作总结
    面向对象简单示例
    面向对象与面向过程
    Tkinter之部件3种放置方式pack、grid、place
    Tkinter之variable用法
    Tkinter之Menu
  • 原文地址:https://www.cnblogs.com/xie-qi/p/13349495.html
Copyright © 2011-2022 走看看