zoukankan      html  css  js  c++  java
  • RabbitMQ快速入门

    一、入门实例

    1)引入依赖

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>3.6.5</version>
    </dependency>
    

    2)创建消费者

    public class Consumer {
    
        public static void main(String[] args) throws Exception {
            // 1.创建连接工厂对象
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 设置主机
            connectionFactory.setHost("111.231.83.100");
            // 设置端口
            connectionFactory.setPort(5672);
            // 设置虚拟主机
            connectionFactory.setVirtualHost("/");
    
            // 2.获取一个连接对象
            final Connection connection = connectionFactory.newConnection();
    
            // 3.创建 Channel
            final Channel channel = connection.createChannel();
    
            // 4.申明队列
            String queueName = "quickStart";
    
            // 5.绑定队列
            channel.queueDeclare(queueName, true, false, false, null);
    
            // 6.创建消费者
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            channel.basicConsume(queueName, true, queueingConsumer);
    
            // 7.循环消费
            System.err.println("消费端启动");
            while (true) {
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.err.println("消费端消费: " + msg);
            }
        }
    
    }
    

    3)创建生产者

    public class Producer {
    
        public static void main(String[] args) throws Exception {
            // 1.创建连接工厂对象
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 设置主机
            connectionFactory.setHost("111.231.83.100");
            // 设置端口
            connectionFactory.setPort(5672);
            // 设置虚拟主机
            connectionFactory.setVirtualHost("/");
    
            // 2.获取一个连接对象
            final Connection connection = connectionFactory.newConnection();
    
            // 3.创建 Channel
            final Channel channel = connection.createChannel();
    
            // 4.循环发送消息
            for (int i = 0; i < 5; i++) {
                String msg = "Hello RabbitMQ" + i;
                channel.basicPublish("", "quickStart", null, msg.getBytes());
            }
            // 5.关闭资源
            channel.close();
            connection.close();
            connectionFactory.clone();
    
        }
    }
    
    1. 先启动消费者,然后启动生产者,查看控制台输出:
    消费端启动
    消费端消费: Hello RabbitMQ0
    消费端消费: Hello RabbitMQ1
    消费端消费: Hello RabbitMQ2
    消费端消费: Hello RabbitMQ3
    消费端消费: Hello RabbitMQ4
    
    • 注1:如果生产者先启动,这时候因为找不到队列,所以消息会被 MQServer 丢弃(mandatory 属性为false 的情况 )
    • 注2:启动过一次消费者后,因为绑定队列时,申明了队列持久化,所以可以找到队列,消息会存储在 MQServer 中,待消费者启动后即可被消费掉。

    二、 常用 API

    2.1 ConnectionFactory

    连接工厂,用于创建连接对象。

    方法名 作用
    setHost(String host) 设置主机 IP
    setPort(int port) 设置主机端口
    setVirtualHost(String virtualHost) 设置虚拟主机
    newConnection() 创建 RabbitMQ 连接对象
    setUsername(String username) 设置用户名
    setPassword(String password) 设置密码
    setAutomaticRecoveryEnabled(boolean automaticRecovery) 设置是否自动恢复
    setClientProperties(Map clientProperties) 设置客户端属性
    setConnectionTimeout(int timeout) 设置连接超时时间
    setRequestedChannelMax(int requestedChannelMax) 设置最大请求信道

    2.2 Connection

    RabbitMQ 连接对象。

    方法名 作用
    createChannel() 创建数据通信信道对象
    getPort() 获取主机端口
    getChannelMax() 获取最大请求信道
    getClientProperties() 获取客户端属性

    2.3 Channel

    数据通信信道对象,用来发送和接收消息。

    方法:queueDeclare

    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) 
    

    作用:

    声明一个队列
    
    参数 参数类型 含义
    queue String 队列名称
    durable boolean 是否持久化,如果设置为true,服务器重启了队列仍然存在
    exclusive boolean 是否为独享队列(排他性队列),只有自己可见的队列,即不允许其它用户访问
    autoDelete boolean 当没有任何消费者订阅该队列时,自动删除该队列
    arguments Map 其他参数

    方法:basicQos

    void basicQos(int prefetchCount) throws IOException;
    

    作用:

    设置一次获取多少个消息
    
    参数 含义
    prefetchCount RabbitMQ 同一时间最多推送不多于 prefetchCount 个消息

    方法:basicConsume

    String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
    

    作用:

    订阅消息并消费
    
    参数 含义
    queue 订阅队列名称
    autoAck 是否开启自动应答,默认是开启的,如果需要手动应答应该设置为false。详情看后续《消息确认机制》
    callback 接收到消息之后执行的回调方法

    方法:basicPublish

    void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
    

    作用:

    生产发送一个消息
    
    参数 含义
    exchange 指定转发器名称 ExchangeName,如果使用空字符串会交给默认的 Exchange 处理
    routingKey 队列名称
    props 和消息有关的其他配置参数,路由报头等
    body 消息内容

    方法:basicAck

    void basicAck(long deliveryTag, boolean multiple) throws IOException;
    

    作用:

    处理完成消息后,手动向服务端发送一次应答。
    
    参数 含义
    deliveryTag 当前消息唯一Id
    multiple 是否把小于当前deliveryTag的小于都应答了。注:这个要在打开应答机制后使用
  • 相关阅读:
    java.sql.SQLException: Access denied for user 'root'@'10.1.0.2' (using password: YES)
    在eclipse中安装使用lombok插件
    Socket编程实践(5) --TCP粘包问题与解决
    Socket编程实践(8) --Select-I/O复用
    Socket编程实践(6) --TCP服务端注意事项
    Socket编程实践(4) --多进程并发server
    Socket编程实践(3) --Socket API
    Socket编程实践(2) --Socket编程导引
    Socket编程实践(1) --TCP/IP简述
    Socket编程实践(11) --epoll原理与封装
  • 原文地址:https://www.cnblogs.com/markLogZhu/p/13259086.html
Copyright © 2011-2022 走看看