zoukankan      html  css  js  c++  java
  • RabbitMQ 消息模式

    消息模式实例

    视频教程:https://ke.qq.com/course/304104

    编写代码前,最好先添加好用户并设置virtual hosts

    一、简单模式

    1.导入jar包

        <dependency>
          <groupId>com.rabbitmq</groupId>
          <artifactId>amqp-client</artifactId>
          <version>4.5.0</version>
        </dependency>

    2.创建连接

    import com.idelan.rabbitmq.utils.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Sender {
        private final static String QUEUE = "testhello"; //队列名字
    
        public static void main(String[] args) throws Exception{
            //获取连接
            Connection connection = ConnectionUtil.getConnection();
    
            //创建通道
            Channel channel = connection.createChannel();
    
            //声明队列,如果队列存在则什么都不做,如果队列不存在才创建
            //参数一: 队列的名字
            //参数二: 是否持久化队列,我们的队列模式是在内存中的,如果rabbit重启会丢失,如果我们设置为true 则会保存到erlng自带的数据库中,重启会重新获取
            //参数三: 是否排外,有两个作用,第一个当我们的链接关闭后是否会自动删除队列,作用二,是否私有当前队列,如果私有了,其他通道不可以访问当前队列,如果为true 一般适合一个队列消费者的时候
            //参数四: 是否自动删除
            //参数五 我们的一些其他的参数
            channel.queueDeclare(QUEUE, false, false, false, null);
    
            //发送内容
            channel.basicPublish("", QUEUE, null, "hello world".getBytes());
    
            //关闭连接
            channel.close();
            connection.close();
        }
    }

    3.消费者

    import com.idelan.rabbitmq.utils.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    
    public class Receiver {
        private final static String QUEUE = "testhello"; //队列名字
    
        public static void main(String[] args) throws Exception{
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE, false, false, false, null);
    
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //接收消息,参数二 是自动确认
            channel.basicConsume(QUEUE, true, consumer);
    
           while (true) {
                //获取消息   如果没有消息会等待,有的话就获取执行然后销毁,是一次性的
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("message:"+message);
           }
        }
    }

    二、工作模式

    1.生产者

    import com.idelan.rabbitmq.utils.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Sender {
        private final static String QUEUE = "testwork"; //队列名字
    
        public static void main(String[] args) throws Exception{
            //获取连接
            Connection connection = ConnectionUtil.getConnection();
    
            //创建通道
            Channel channel = connection.createChannel();
    
            //声明队列,如果队列存在则什么都不做,如果队列不存在才创建
            //参数一: 队列的名字
            //参数二: 是否持久化队列,我们的队列模式是在内存中的,如果rabbit重启会丢失,如果我们设置为true 则会保存到erlng自带的数据库中,重启会重新获取
            //参数三: 是否排外,有两个作用,第一个当我们的链接关闭后是否会自动删除队列,作用二,是否私有当前队列,如果私有了,其他通道不可以访问当前队列,如果为true 一般适合一个队列消费者的时候
            //参数四: 是否自动删除
            //参数五 我们的一些其他的参数
            channel.queueDeclare(QUEUE, false, false, false, null);
    
            for (int i = 0; i < 20; i++){
                //发送内容
                channel.basicPublish("", QUEUE, null, ("hello world "+i).getBytes());
            }
    
            //关闭连接
            channel.close();
            connection.close();
        }
    }

    2.消费者1

    import com.idelan.rabbitmq.utils.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Receiver1 {
        private final static String QUEUE = "testwork"; //队列名字
    
        public static void main(String[] args) throws Exception{
            Connection connection = ConnectionUtil.getConnection();
            final Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE, false, false, false, null);
    
            //告诉服务器,在我没有确认当前消息完成之前,不要给我发新消息
            channel.basicQos(1);
    
            DefaultConsumer consumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //当我们收到消息的时候调用
                    System.out.println("消费者1 收到的消息内容是:" + new String(body));
                    //确认 参数2,false为确认收到消息,true 为拒绝接收
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
    
            //注册消费者,参数2 收到确认,代表我们收到消息后需要手动告诉服务器,我们收到消息了
            channel.basicConsume(QUEUE, false, consumer);
        }
    }

    3.消费者2

    import com.idelan.rabbitmq.utils.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Receiver2 {
        private final static String QUEUE = "testwork"; //队列名字
    
        public static void main(String[] args) throws Exception{
            Connection connection = ConnectionUtil.getConnection();
            final Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE, false, false, false, null);
    
            //告诉服务器,在我没有确认当前消息完成之前,不要给我发新消息
            channel.basicQos(1);
            DefaultConsumer consumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //当我们收到消息的时候调用
                    System.out.println("消费者2 收到的消息内容是:" + new String(body));
                    //确认 参数2,false为确认收到消息,true 为拒绝接收
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
    
            //注册消费者,参数2 收到确认,代表我们收到消息后需要手动告诉服务器,我们收到消息了
            channel.basicConsume(QUEUE, false, consumer);
        }
    }

    三、发布订阅模式

    1.生产者

    import com.idelan.rabbitmq.utils.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Sender {
        private final static String EXCHANGE_NAME = "testexchange"; //定义交换机名字
    
        public static void main(String[] args) throws Exception{
    
           Connection connection = ConnectionUtil.getConnection();
           Channel channel = connection.createChannel();
           //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//定义一个交换机,类型是fanout
    
            //发布订阅模式,因为消息是先发布到交换机中,而交换机是没有保存功能的,所以如果没有消费者,消息则会丢失
            channel.basicPublish(EXCHANGE_NAME, "", null, "发布订阅模式的消息".getBytes());
            channel.close();
            connection.close();
        }
    }

    2.消费者1

    import com.idelan.rabbitmq.utils.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Receiver1 {
        private final static String EXCHANGE_NAME = "testexchange"; //定义交换机名字
    
        public static void main(String[] args) throws Exception{
            Connection connection = ConnectionUtil.getConnection();
            final Channel channel = connection.createChannel();
            channel.queueDeclare("testpubQueue1", false, false, false, null);
    
            //绑定队列到交换机
            channel.queueBind("testpubQueue1", EXCHANGE_NAME, "");
            channel.basicQos(1);
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1:"+new String(body));
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            channel.basicConsume("testpubQueue1", false, consumer);
        }
    }

    3.消费者2

    import com.idelan.rabbitmq.utils.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Receiver2 {
        private final static String EXCHANGE_NAME = "testexchange"; //定义交换机名字
    
        public static void main(String[] args) throws Exception{
            Connection connection = ConnectionUtil.getConnection();
            final Channel channel = connection.createChannel();
            channel.queueDeclare("testpubQueue2", false, false, false, null);
    
            //绑定队列到交换机
            channel.queueBind("testpubQueue2", EXCHANGE_NAME, "");
            channel.basicQos(1);
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2:"+new String(body));
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            channel.basicConsume("testpubQueue2", false, consumer);
        }
    }

    四、路由模式

    1.生产者

    import com.idelan.rabbitmq.utils.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Sender {
        private final static String EXCHANGE_NAME = "testexroute"; //定义交换机名字
    
        public static void main(String[] args) throws Exception{
    
           Connection connection = ConnectionUtil.getConnection();
           Channel channel = connection.createChannel();
           //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");//定义一个路由格式的交换机
    
            //发布订阅模式,因为消息是先发布到交换机中,而交换机是没有保存功能的,所以如果没有消费者,消息则会丢失
            // routingKey 为key1
            channel.basicPublish(EXCHANGE_NAME, "key3", null, "路由模式的消息".getBytes());
            channel.close();
            connection.close();
        }
    }

    2.消费者1

    import com.idelan.rabbitmq.utils.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Receiver1 {
        private final static String EXCHANGE_NAME = "testexroute"; //定义交换机名字
    
        public static void main(String[] args) throws Exception{
            Connection connection = ConnectionUtil.getConnection();
            final Channel channel = connection.createChannel();
            channel.queueDeclare("testRouteQueue1", false, false, false, null);
    
            //绑定队列到交换机
            //参数3标记,绑定到交换机的时候会指定一个标记,只有和它一样的标记的消息才会被当前消费者接收到
            channel.queueBind("testRouteQueue1", EXCHANGE_NAME, "key1");
            //如果需要绑定多个标记 在执行一次即可
            channel.queueBind("testRouteQueue1", EXCHANGE_NAME, "key3");
            channel.basicQos(1);
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1:"+new String(body));
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            channel.basicConsume("testRouteQueue1", false, consumer);
        }
    }

    3.消费者2

    import com.idelan.rabbitmq.utils.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Receiver2 {
        private final static String EXCHANGE_NAME = "testexroute"; //定义交换机名字
    
        public static void main(String[] args) throws Exception{
            Connection connection = ConnectionUtil.getConnection();
            final Channel channel = connection.createChannel();
            channel.queueDeclare("testRouteQueue2", false, false, false, null);
    
             //绑定队列到交换机
            //参数3标记,绑定到交换机的时候会指定一个标记,只有和它一样的标记的消息才会被当前消费者接收到
            channel.queueBind("testRouteQueue2", EXCHANGE_NAME, "key1");
            //如果需要绑定多个标记 在执行一次即可
            channel.queueBind("testRouteQueue2", EXCHANGE_NAME, "key2");
            channel.basicQos(1);
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2:"+new String(body));
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            channel.basicConsume("testRouteQueue2", false, consumer);
        }
    }

    五、主题模式

    1.生产者

    import com.idelan.rabbitmq.utils.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Sender {
        private final static String EXCHANGE_NAME = "testexchangetopic"; //定义交换机名字
    
        public static void main(String[] args) throws Exception{
    
           Connection connection = ConnectionUtil.getConnection();
           Channel channel = connection.createChannel();
           //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");//定义一个topic 格式的交换机
    
            //发布订阅模式,因为消息是先发布到交换机中,而交换机是没有保存功能的,所以如果没有消费者,消息则会丢失
            // routingKey 为key1
            // * 只能匹配一个字符 # 可以匹配多个字符
            channel.basicPublish(EXCHANGE_NAME, "abc.1.3", null, "topic模式的消息".getBytes());
            channel.close();
            connection.close();
        }
    }

    2.消费者1

    import com.idelan.rabbitmq.utils.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Receiver1 {
        private final static String EXCHANGE_NAME = "testexchangetopic"; //定义交换机名字
    
        public static void main(String[] args) throws Exception{
            Connection connection = ConnectionUtil.getConnection();
            final Channel channel = connection.createChannel();
            channel.queueDeclare("testTopicQueue1", false, false, false, null);
    
            //绑定队列到交换机
            //参数3标记,绑定到交换机的时候会指定一个标记,只有和它一样的标记的消息才会被当前消费者接收到
            channel.queueBind("testTopicQueue1", EXCHANGE_NAME, "key.*");
            //如果需要绑定多个标记 在执行一次即可
            channel.queueBind("testTopicQueue1", EXCHANGE_NAME, "abc.#");
            channel.basicQos(1);
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1:"+new String(body));
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            channel.basicConsume("testTopicQueue1", false, consumer);
        }
    }

    3.消费者2

    import com.idelan.rabbitmq.utils.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Receiver2 {
        private final static String EXCHANGE_NAME = "testexchangetopic"; //定义交换机名字
    
        public static void main(String[] args) throws Exception{
            Connection connection = ConnectionUtil.getConnection();
            final Channel channel = connection.createChannel();
            channel.queueDeclare("testTopicQueue2", false, false, false, null);
    
             //绑定队列到交换机
            //参数3标记,绑定到交换机的时候会指定一个标记,只有和它一样的标记的消息才会被当前消费者接收到
            channel.queueBind("testTopicQueue2", EXCHANGE_NAME, "key.#");
            //如果需要绑定多个标记 在执行一次即可
            channel.queueBind("testTopicQueue2", EXCHANGE_NAME, "abc.*");
            channel.basicQos(1);
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2:"+new String(body));
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            channel.basicConsume("testTopicQueue2", false, consumer);
        }
    }
  • 相关阅读:
    java 开发面试题小整理(二)
    Java 字符串比较小知识
    适配器、工厂模式、线程池、线程组、互斥锁、Timer类、Runtime类、单例设计模式(二十四)
    多线程、死锁、线程安全、同步方法、代码块、休眠、守护线程、Thread、Runnable(二十三)
    RabbitMQ的几种典型使用场景
    SQL一些问题
    Redis和Memcached的区别
    Mongodb 使用场景和不使用场景
    Adapter as a WCF Binding
    ASP.NET MVC使用Filter解除Session, Cookie等依赖
  • 原文地址:https://www.cnblogs.com/gyli20170901/p/10137341.html
Copyright © 2011-2022 走看看