zoukankan      html  css  js  c++  java
  • 消息队列 (2) RabbtMQ 简单模式、Work Queues工作模式

    第一种模式:简单模式 一对一

    如图:P代表生产者(要发送消息的程序),C代表消费者(会一直等待消息到来),红色部分为消息队列(类似邮箱,可以缓存消息,生产者向其中投递消息,消费者从中取出消息)

    一、编写消费者代码

      1.首先创建一个maven项目,然后导入rabbitMQjar包

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>testRabbit</groupId>
        <artifactId>test</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <dependencies>
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>6.5.0/version>
            </dependency>
        </dependencies>
    </project>

      2.创建消费者Producer

    public class Producer {
        public final static String QUEUE_NAME = "rabbitMQ.test";
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
    
            //设置RabbitMQ相关信息
            factory.setHost("10.211.55.4"); //ip
    factory.setVirtualHost("local"); //虚拟机 factory.setPort(5672); //端口 默认
    factory.setUsernaem("admin");
    factory.setPassword("admin"); //账号 虚拟机是在 10.211.55.4:15672中创建并分配的
    //创建一个新的连接 Connection connection = factory.newConnection(); //如果连接失败 检查linux 端口是否已开启 是5672 //创建一个通道 Channel channel = connection.createChannel(); //声明一个队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    //发送消息到队列中 String message = "hello rabbitMQ"; channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8")); System.out.println("Producer Send : " + message); //关闭通道和连接 channel.close(); connection.close(); } }

    queueDeclare第一个参数表示队列名称,第二个参数为是否持久化(true表示是,队列将在服务器重启时生存),第三个参数为是否独占队列(创建者可以使用的私有队列,断开后自动删除),第四个参数为当所有消费者客户端连接断开时是否自动删除队列,第五个参数为队列的其他参数。

    basicPublish第一个参数为交换机名称 空走默认的,第二个参数为队列映射的路由key,第三个参数为消息的其他属性,第四个参数为发送消息的主体。

      3.创建消费者

    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Customer {
        public final static String QUEUE_NAME = "rabbitMQ.test";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置RabbitMQ相关信息
            factory.setHost("10.211.55.4"); //ip
    factory.setVirtualHost("local"); //虚拟机 factory.setPort(5672); //端口 默认
    factory.setUsernaem("admin");
    factory.setPassword("admin"); //账号 虚拟机是在 10.211.55.4:15672中创建并分配的
    //创建一个新的连接
            Connection connection = factory.newConnection();
            //创建一个通道
            Channel channel = connection.createChannel();
            //声明要关注的队列
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            System.out.println("客户端等待接受消息");
    
            //DefaultConsumer类实现了Consumer接口,通过传入一个频道,告诉服务器我们需要哪个频道的信息,如果频道中有消息,就会执行回调函数handleDelivery
            Consumer comsumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String (body,"UTF-8");
                    System.out.println("客户端接收:"+message);
                }
            };
    
            //自动回答队列应答 -- RabbitMQ中的消息确认机制
             channel.basicConsume(QUEUE_NAME,true,comsumer);
            //无需关闭 否则接收不到了
        }
    }

    该方法用于获取生产者发送的消息,其中envelope主要存放生产者相关的信息(比如交换机、路由key等)body是消息实体。

    运行结果如下 首先启动customer 保持运行状态,  执行producer 发送消息 会发现 customer会自动获取到:

    第二种模式:WorkQueues 工作队列模式 一对多 多个Consumer是竞争关系 只能有一个消费者收到

    多个消费者共同消费同一个队列中的消息,对于任务过重或任务较多的情况可以提高任务处理的速度。

    新建一个生产者NewTask

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class NewTask {
        public final static String TASK_QUEUE_NAME = "task_queue";
    
        public static void main(String [] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
    
            //分发消息
            for(int i = 0;i<10;i++){
                String message = "hello RabbitMQ "+ i;
                channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
                System.out.println("NewTask send :" + message);
            }
    
            channel.close();
            connection.close();
    
        }
    }

    然后创建2个工作者work1和work2代码一样

    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Work1 {
        private static final String TASK_QUEUE_NAME = "task_queue";
    
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            final ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
    
            channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
            System.out.println("Work1 等待接受消息");
    
            //每次从队列获取的数量
            channel.basicQos(1);
    
            final Consumer consumer = new DefaultConsumer(channel){
    
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body,"UTF-8");
                    System.out.println("Worker1 接受到消息:"+message);
                    try{
                        //throw new Exception();
                        doWork(message);
                    }catch (Exception ex){
                        channel.abort();
                    }finally {
                        System.out.println("Work1 完成了");
                        channel.basicAck(envelope.getDeliveryTag(),false);
                    }
                }
            };
    
            boolean autoAck=false;
            //消息消费完成确认
            channel.basicConsume(TASK_QUEUE_NAME,autoAck,consumer);
    
        }
        private static void doWork(String task) {
            try {
                Thread.sleep(1000); // 暂停1秒钟
            } catch (InterruptedException _ignored) {
                Thread.currentThread().interrupt();
            }
        }
    }

    channel.basicQos(1);保证一次只分发一个,autoAck是否自动回复,如果为true的话,每次生产者只要发送信息就会从内存中删除,那么如果消费者程序异常退出,那么就无法获取数据,我们当时不希望出现这样的情况,所以才去手动回复,每当消费者收到并处理信息然后在通知生产者,最后从队列中删除这条信息。如果消费者异常退出,如果还有其他消费者,那么就会把队列中的消息发送给其他消费者,如果没有,等消费者启动时候再次发送。

    两个都不抛出异常时:

    其中一个设置为异常时,会把消息都发送给另一个正常的。等待异常的程序重启后,才会继续给它发送。

  • 相关阅读:
    关于生产库的表空间是否自己主动扩展的看法?
    Hive创建外部表以及分区
    oninput,onpropertychange,onchange的使用方法和差别
    cacti
    java实现第五届蓝桥杯殖民地
    java实现第五届蓝桥杯殖民地
    java实现第五届蓝桥杯殖民地
    java实现第五届蓝桥杯殖民地
    java实现第五届蓝桥杯殖民地
    java实现第五届蓝桥杯LOG大侠
  • 原文地址:https://www.cnblogs.com/baidawei/p/9172433.html
Copyright © 2011-2022 走看看