zoukankan      html  css  js  c++  java
  • RabbitMQ学习之旅(一)

    RabbitMQ学习总结(一)

    RabbitMQ简介

    RabbitMQ是一个消息代理,其接收并转发消息。类似于现实生活中的邮局:你把信件投入邮箱的过程,相当于往队列中添加信息,因为所有邮箱中的信件最终都会汇集到邮局中;当邮递员把你的新建发送给收件人的时候,相当于消息的转发。

    RabbitMQ中的常见术语

    • 生产者(Provider):生产者负责生产消息,并将其发送到消息队列中
    • 队列(Queue):消息代理(Proxy)角色,从生产者那里接收消息,并将其转发到消费者进行消费。队列主要受限于主机的内存和磁盘的大小,其本质是一个消息缓冲区
    • 消费者(Consumer):消费者从队列中接收消息,并对消息进行处理
    • 交换机(Exchange):交换机位于生产者(Provider)和队列(Queue)之间,相当于路由(Routing),服务器会根据路由键(RoutingKey)将消息经由交换机路由到对应的队列(Queue)上去
    • 信道(Channel):信道是生产者与消费者和队列进行通信的渠道,其是建立在TCP连接上的虚拟连接;RabbitMQ在一条TCP连接上建立成千上百条信道来达到多线程处理,这个TCP被多个线程共享,每个线程对应一个信道,每个信道对应唯一的ID,保障信道私有性

    RabbitMQ简单使用

    STEP1:准备`Maven`依赖

    <dependency>
          <groupId>com.rabbitmq</groupId>
          <artifactId>amqp-client</artifactId>
          <version>RELEASE</version>
    </dependency>

    STEP2:生产者

    public class Provider {
        private final static String QUEUE_NAME = "hello";

        public static void main(String[] args) throws IOException, TimeoutException {
            // 创建连接
            ConnectionFactory factory = new ConnectionFactory();
            // 设置 RabbitMQ 的主机名
            factory.setHost("ip");
            factory.setPort(5672);
            factory.setUsername("username");
            factory.setPassword("password");
            // 创建一个连接
            Connection connection = factory.newConnection();
            // 创建一个信道
            Channel channel = connection.createChannel();    
            // 指定一个队列
            // queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
            // 参数1 queue :队列名
            // 参数2 durable :是否持久化
            // 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除
            // 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列
            // 参数5 arguments
            channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);
            // 发送消息
            String message = "Hello World!";
            // basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
            // 参数1 exchange :交换器
            // 参数2 routingKey : 路由键
            // 参数3 props : 消息的其他参数
            // 参数4 body : 消息体
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            // 关闭频道和连接  
            channel.close();
            connection.close();
        }
    }

    注意:声明队列是幂等的,代表着只有在队列不存在的时候才会被创建,多次声明并不会重复创建。

    STEP3:消费者

    public class Recv {
        private final static String QUEUE_NAME = "hello";

        public static void main(String[] args) throws IOException, TimeoutException {
            // 创建连接
            ConnectionFactory factory = new ConnectionFactory();
            // 设置 RabbitMQ 的主机名
            factory.setHost("ip");
            factory.setPort(5672);
            factory.setUsername("username");
            factory.setPassword("password");
            // 创建一个连接
            Connection connection = factory.newConnection();
            // 创建一个通道
            Channel channel = connection.createChannel();
            // 指定一个队列
            // queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
            // 参数1 queue :队列名
            // 参数2 durable :是否持久化
            // 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除
            // 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列
            // 参数5 arguments
            channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            // 创建队列消费者,此处是一个回调函数,当消费者接收到来自消息队列的消息时,会调用该接口中被重写的方法
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body)
     throws IOException 
    {
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + message + "'");
                }
            };
            // basicConsume(String queue, boolean autoAck, Consumer callback)
            // 参数1 queue :队列名
            // 参数2 autoAck : 是否自动ACK;为true的时候表示自动回复,为false的时候表示手动回复;自动回复的含义是,当消费
            // 者接收到消息的时候,就自动认定该消息已经被消费;手动回复的含义是,不进行自动回复,只有接收到方法发送到的确认信
            // 息,才确认该消息已经被消费
            // 参数3 callback : 消费者对象的一个接口,用来配置回调
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }
  • 相关阅读:
    48. Rotate Image
    47. Permutations II
    46. Permutations
    45. Jump Game II
    44. Wildcard Matching
    43. Multiply Strings
    42. Trapping Rain Water
    41. First Missing Positive
    40. Combination Sum II
    39. Combination Sum
  • 原文地址:https://www.cnblogs.com/lurker-yaojiang/p/9912396.html
Copyright © 2011-2022 走看看