zoukankan      html  css  js  c++  java
  • rabbitMQ第二篇:java简单的实现RabbitMQ

     前言:在这里我将用java来简单的实现rabbitMQ。下面我们带着下面问题来一步步的了解和学习rabbitMQ。

    1:如果消费者连接中断,这期间我们应该怎么办

    2:如何做到负载均衡

    3:如何有效的将数据发送到相关的接收者?就是怎么样过滤

    4:如何保证消费者收到完整正确的数据

    5:如何让优先级高的接收者先收到数据

    一:"Hello RabbitMQ"

    下面有一幅图,其中P表示生产者,C表示消费者,红色部分为消息队列

     二:项目开始

    2.1:首先引入rabbitMQ jar包

     <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>3.6.5</version>
     </dependency>

    2.2:创建消费者Producer

    复制代码
    /**
     * 消息生成者
     */
    public class Producer {
        public final static String QUEUE_NAME="rabbitMQ.test";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置RabbitMQ相关信息
            factory.setHost("localhost");
          //factory.setUsername("lp");
          //factory.setPassword("");
         // factory.setPort(2088);
            //创建一个新的连接
            Connection connection = factory.newConnection();
            //创建一个通道
            Channel channel = connection.createChannel();
            //  声明一个队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello RabbitMQ";
            //发送消息到队列中
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println("Producer Send +'" + message + "'");
            //关闭通道和连接
            channel.close();
            connection.close();
        }
    }
    复制代码

    注1:queueDeclare第一个参数表示队列名称、第二个参数为是否持久化(true表示是,队列将在服务器重启时生存)、第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数

    注2:basicPublish第一个参数为交换机名称、第二个参数为队列映射的路由key、第三个参数为消息的其他属性、第四个参数为发送信息的主体

    2.3:创建消费者

    复制代码
    public class Customer {
        private final static String QUEUE_NAME = "rabbitMQ.test";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置RabbitMQ地址
            factory.setHost("localhost");
            //创建一个新的连接
            Connection connection = factory.newConnection();
            //创建一个通道
            Channel channel = connection.createChannel();
            //声明要关注的队列
            channel.queueDeclare(QUEUE_NAME, false, false, true, null);
            System.out.println("Customer Waiting Received messages");
            //DefaultConsumer类实现了Consumer接口,通过传入一个频道,
            // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("Customer Received '" + message + "'");
                }
            };
            //自动回复队列应答 -- RabbitMQ中的消息确认机制
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    复制代码

    前面代码我们可以看出和生成者一样的,后面的是获取生产者发送的信息,其中envelope主要存放生产者相关信息(比如交换机、路由key等)body是消息实体。

    2.4:运行结果

    生产者:

    消费者:

     三:实现任务分发

    工作队列

    一个队列的优点就是很容易处理并行化的工作能力,但是如果我们积累了大量的工作,我们就需要更多的工作者来处理,这里就要采用分布机制了。

    我们新创建一个生产者NewTask

    复制代码
    public class NewTask {
        private static final String TASK_QUEUE_NAME="task_queue";
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory=new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection=factory.newConnection();
            Channel channel=connection.createChannel();
       channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
            //分发信息
            for (int i=0;i<10;i++){
                String message="Hello RabbitMQ"+i;
                channel.basicPublish("",TASK_QUEUE_NAME,
                        MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
                System.out.println("NewTask send '"+message+"'");
            }
            channel.close();
            connection.close();
        }
    }
    复制代码

    然后创建2个工作者Work1和Work2代码一样

    复制代码
    public class Work1 {
        private static final String TASK_QUEUE_NAME = "task_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            final ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
    
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
            System.out.println("Worker1  Waiting for messages");
    
            //每次从队列获取的数量
            channel.basicQos(1);
    
            final Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("Worker1  Received '" + message + "'");
                    try {
                        throw  new Exception();
                        //doWork(message);
                    }catch (Exception e){
                        channel.abort();
                    }finally {
                        System.out.println("Worker1 Done");
                        channel.basicAck(envelope.getDeliveryTag(),false);
                    }
                }
            };
            boolean autoAck=false;
            //消息消费完成确认
            channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
        }
        private static void doWork(String task) {
            try {
                Thread.sleep(1000); // 暂停1秒钟
            } catch (InterruptedException _ignored) {
                Thread.currentThread().interrupt();
            }
        }
    }
    复制代码

    注:channel.basicQos(1);保证一次只分发一个 。autoAck是否自动回复,如果为true的话,每次生产者只要发送信息就会从内存中删除,那么如果消费者程序异常退出,那么就无法获取数据,我们当然是不希望出现这样的情况,所以才去手动回复,每当消费者收到并处理信息然后在通知生成者。最后从队列中删除这条信息。如果消费者异常退出,如果还有其他消费者,那么就会把队列中的消息发送给其他消费者,如果没有,等消费者启动时候再次发送。

    关于上面我们遗留问题在下一篇继续讲解

  • 相关阅读:
    【学习笔记】ASP.NET页面之间传值的方式之Application
    【学习笔记】ASP.NET页面之间传值的方式之QueryString
    【学习笔记】C#中的装箱(inboxing)和拆箱(unboxing)
    C# Func和Action用法以及区别和使用Lambda表达式
    构建ABP vNext项目并切换MySql数据库
    .Net FrameWork发布项目时报Microsoft.Net.Compilers is only supported on MSBuild v16.3 and above错误解决方案
    Docker容器与Linux主机环境获取时间不一致
    shell脚本中无法使用cd的问题解决方法
    Docker安装Mysql8.0,并配置忽略大小写
    Docker-Compose排版一些坑
  • 原文地址:https://www.cnblogs.com/tiancai/p/9890173.html
Copyright © 2011-2022 走看看