zoukankan      html  css  js  c++  java
  • RabbitMQ入门:Hello RabbitMQ 代码实例

    本篇博客围绕下面几个方面展开:

    1. 代码前的理论热身
    2. 代码实例:Hello RabbitMQ
    3. 运行代码并调试问题

    Now, Let's begin !

    一、代码前的理论热身

    我们来看张图:

    Publisher(生产者)生成消息,然后publish(发布)消息到exchange(路由器,也有资料翻译成交换机),然后根据路由规则将消息传递到Queue(队列),最终交由Consumer(消费者)进行消费处理。

    这里的生产者和消费者都是我们的应用,因此我们的代码中要实现这两个部分。

    中间的节点就是RabbitMQ 提供的内容,需要再生产者和消费者里面调用其接口来定义和使用这些节点。

    二、代码实例:Hello RabbitMQ

    package com.sam.hello_rabbitmq;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Provider {
    
        //定义队列名
        static String QUEUE_NAME = "helloRabbit";
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                //1.创建连接和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
                
                //2.为通道声明队列
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                
                //3.发布消息
                String msg = " hello rabbitmq, welcome to sam's blog.";
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
                System.out.println("provider send a msg: " + msg);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                //4.关闭连接
                if (channel != null) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }
    
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
    
        }
    
    }

    在第2步中,channel.queueDeclare 用来创建队列,有5个参数:String queue, 队列名; boolean durable, 该队列是否需要持久化; boolean exclusive,该队列是否为该通道独占的(其他通道是否可以消费该队列); boolean autoDelete,该队列不再使用的时候,是否让RabbitMQ服务器自动删除掉; Map<String, Object> arguments 其他参数。第3步中,channel.basicPublish 发布消息(用在生产者),有4个参数:String exchange, 路由器(有的资料翻译成交换机)的名字,即将消息发到哪个路由器; String routingKey, 路由键,即发布消息时,该消息的路由键是什么; BasicProperties props, 指定消息的基本属性; byte[] body 消息体,也就是消息的内容,是字节数组。 可能你会疑惑,为什么没有exchange呢?因为如果声明了队列,可以不声明路由器。

    2.接着来实现消费者,消费者实现和生产者过程差不多,但是在这里并没有关闭连接和通道,是因为要消费者一直等待随时可能发来的消息。代码如下:

    package com.sam.hello_rabbitmq;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    public class HelloConsumer {
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                // 1.创建连接和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.为通道声明队列
                channel.queueDeclare(Provider.QUEUE_NAME, false, false, false, null);
                System.out.println(" **** keep alive ,waiting for messages, and then deal them");
                // 3.通过回调生成消费者
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                            com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
                        
                        //获取消息内容然后处理
                        String msg = new String(body, "UTF-8");
                        System.out.println("*********** HelloConsumer" + " get message :[" + msg +"]");
                    }
                };
                
                //4.消费消息
                channel.basicConsume(Provider.QUEUE_NAME, true, consumer);
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }

    在第4步中,channel.basicConsume 用来接收消息,用在消费者,有3个参数:String queue, 队列名字,即要从哪个队列中接收消息; boolean autoAck, 是否自动确认,默认true; Consumer callback 消费者,即谁接收消息。

    三、运行代码并调试问题

    代码写好了,接下来进行测试,

    1. 先来执行下Provider.java,,后台打印了内容,并且队列中有了一条ready的消息。
    2. 执行HelloConsumer.java,预想的结果是在启动后,控制台直接打印出log并且RabbitMQ管理页面没有ready的消息:
  • 相关阅读:
    Javascript 进阶
    transform顺序浅谈
    js对象克隆
    js动画最佳实现——requestAnimationFrame
    svg标签
    typeof和instanceof
    js变量浅谈
    X-UA-compatible浅谈
    封装$
    面向对象
  • 原文地址:https://www.cnblogs.com/cc-java/p/9204576.html
Copyright © 2011-2022 走看看