zoukankan      html  css  js  c++  java
  • Rabbit MQ 客户端 API 开发

    项目开始

    第一步首先需要引入对应的 jar 包

    <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.6.0</version>
    </dependency>
    

    连接 RabbitMQ

    连接 RabbitMQ 有两种方式:1.根据特定参数连接 2.根据 URI 连接
    根据特定的参数连接(用户名、密码、IP 地址、虚拟 broker、端口号)

    // 声明连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 使用连接工厂赋值 userName、password、host、vHost、port 等等
    factory.setUsername("userName");
    factory.setPassword("password");
    factory.setHost("host");
    factory.setVirtualHost("vhost");
    factory.setPort(5672);
    // 创建连接
    Connection conn = factory.newConnection();
    

    根据 URI 连接也可以选择使用 URI 的方式来实现

    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://userName:password@host:port/vhost");
    Connection connection = factory.newConnection();
    System.out.println(connection);
    connection.close();
    

    Connection 可以用来创建多个 Channel 实例,但是 Channel 实例不能在线程间共享,应该为每一个线程开辟一个 Channel。线程共享会导致在网络上出现错误的通信帧交错,同时也会影响发送方确认机制的运行,所以多线程间共享 Channel 实例是非线程安全的。

    使用队列和交换器

    如何声明一个队列和交换器

    Channel channel = connection.createChannel();
    channel.exchangeDeclare("exchangeName", "direct", true);
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "exchangeName", "routuingKey");
    

    上面创建了一个持久化的、非自动删除的、绑定类型为 direct 的交换器,同时也创建了一个非持久化的、排他的、自动删除的队列(此队列的名称由 RabbitMQ 自动生成)。

    exchangeDeclare 方法详解

    exchangeDeclare 有多个重载方法都是由下面的这个方法中参数的缺省构成的。

    public DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;
    

    此方法的返回值是 Exchange.DeclareOk 用来标识成功声明了一个交换器。
    参数说明
      1.exchange: 交换器的名称。
      2.type: 交换器的类型。
      3.durable: 设置是否持久化。true 表示持久化 false 表示非持久化。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
      4.autoDelete: 设置是否自动删除。true 表示自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑。注意不能错误地把这个参数理解为: “当与此交换器连接的客户端都断开时,RabbitMQ 会自动删除本交换器”。
      5.internal: 设置是否是内置的。true 则表示内置交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
      6.arguments: 其他一些结构化参数。比如 alternate-exchange。

    有声明创建交换器的方法,也对应肯定也有删除交换器的方法。其中 exchange 表示交换器的名称,而 ifUnuserd 用来设置在交换器没有被使用的情况下删除,如果设置为 true,则只有再此交换器没有被使用的情况下才会被删除,如果设置为 false,则无论如何这个交换器都要被删除。

    public AMQP.Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;
    public AMQP.Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;
    

    不带任何参数的 queueDeclare 方法默认创建一个由 RabbitMQ 命名的名称,这种队列也称匿名队列,拥有排他、自动删除、非持久化的属性。

    queueDeclare 方法详解

    public AMQP.Queue.DeclareOk queueDeclare() throws IOException;
    public AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
    

    不带任何参数的 queueDeclare 方法默认创建一个由 RabbitMQ 命名的名称,这种队列也称匿名队列,拥有排他、自动删除、非持久化的属性。
    参数说明
      1.queue: 队列的名称。
      2.durable: 设置是否持久化。true 表示持久化 false 表示非持久化。
      3.exclusive: 设置是否排他。true 表示为排他,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。
        3.1.排他队列是基于连接(Connection)可见的。同一个连接的不同信道(Channel)是可以同时访问同一连接创建的排他队列;
        3.2."首次"是指如果一个连接己经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同:
        3.3.即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。
      4.autoDelete: 设置是否自动删除。true 表示设置为自动删除。
        4.1.自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。不能把这个参数错误地理解为:“当连接到此队列的所有客户端断开时,这个队列自动删除”,因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。
      5.arguments: 设置队列的其他一些参数。如 x-rnessage-ttl 、x-expires 、x -rnax-length 、x-rnax-length-bytes 、x-dead-letter-exchange 、x-dead­letter-routing-key, x-rnax-priority 等。

    与交换器对应,关于队列也有删除的相应方法。其中 queue 表示队列名称,而 ifUnuserd 用来设置在队列没有被使用的情况下删除,如果设置为 true,则只有再此队列没有被使用的情况下才会被删除,如果设置为 false,则无论如何这个队列都要被删除。ifEmpty 设置为 true 表示在队列为空(队列里面没有任何消息堆积)的情况下才能够删除。

    public AMQP.Queue.DeleteOk queueDelete(String queue) throws IOException;
    public AMQP.Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
    

    还有一个方法用来清空队列中的内容,而不删除队列本身。

    public AMQP.Queue.PurgeOk queuePurge(String queue) throws IOException;
    

    exchangeBind 方法详解

    除了可以将交换器和队列绑定,还可以将交换器与交换器绑定,绑定方法如出一辙。

    public AMQP.Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;
    public AMQP.Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
    

    参数说明
      1.destination: 需要被绑定的交换器或队列
      2.source: 需要绑定的交换器或队列。
      3.routingKey: 路由键。
      4.其他一些结构化参数。

    发送消息

    Channel.basicPublish 方法是发送消息。

    String message = "Hello Word!";
    channel.basicPublish("exchangeName", "routingKey", null, message.getBytes());
    

    方法概览

    public void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) throws IOException;
    public void basicPublish(String exchange, String routingKey, boolean mandatory, AMQP.BasicProperties props, byte[] body) throws IOException;
    public void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, AMQP.BasicProperties props, byte[] body) throws IOException;
    

    参数说明
      1.exchange: 交换器的名称,指明消息需要发送到哪个交换器中。如果设置为空串。则消息会被发送到 RabbitMQ 默认的交换器中。
      2.routhingKey: 路由键,交换器根据路由键将消息存储到相应的队列之中。
      3.props: 消息的基本属性集。其包含 14 个属性成员,分别有 contentType、content Encoding、headers(Map<String, Object>)、deliveryMode、priority、correlationld、replyTo、expiration、messageld、timestamp、type、userld、appld、clusterld。
      4.body: 消息体需要发送的消息。
      5.mandatory: true 表示如果交换器根据路由键无法找到符合条件的队列时会将消息返回给生产者 false 时将消息直接丢弃。
      6.immediate: true 表示交换器将消息路由到队列发现并不存在消费者时那么此消息不会被放入队列。

    消费消息

    RabbitMQ 消息模式分两种: 推 Push 模式和拉 Pull 模式。推模式采用 Basic.Consume 进行消费,而拉则是调用 Basic.Get 进行消费。我们只介绍推模式,对拉模式感兴趣可自行研究。

    Connection conn = factory.newConnection();
    Channel channel = conn.createChannel();
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    channel.basicQos(1);
    DeliverCallback deliverCallback = new DeliverCallback() {
    	@Override public void handle(String s, Delivery delivery) throws IOException {
    		String message = new String(delivery.getBody(), "UTF-8");
    		System.out.println("RabbitMqWorkRec1Test Received: " + message);
    		channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    	}
    };
    channel.basicConsume(QUEUE_NAME, false, deliverCallback, new CancelCallback() {
    	@Override public void handle(String s) throws IOException {
    		System.out.println("RabbitMqWorkRec1Test CancelCallback==========> " + s);
    	}
    });
    

    以上代码中显式的 ack 操作对于消费者来说是非常必要的,可以防止必须要的消息丢失。

    方法概览

    public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
    public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
    

    参数说明
      1.queue: 队列名称。
      2.autoAck: 设置是否自动确认。true 表示不自动确认,建议设置为 false。
      3.consumerTag: 消费者标签,用来区分多个消费者。
      4.noLocal: 设置为 true 表示不能将一个连接中生产者发送的消息传给这个连接的消费者。
      5.exclusive: 设置是否排他。
      6.arguments: 设置消费者的其他参数。
      7.callback: 设置消费者的回调函数。用来处理 RabbitMQ 推送过来的消息。

    Demo 示例

    最后看来下完整的生产消费者示例。
    生产者

    public class RabbitMqPubTest {
    	private static final String EXCHANGE_NAME = "test_exchange_fanout";
    
    	public static void main(String[] args) {
    		Connection conn = null;
    		Channel channel = null;
    		try {
    			ConnectionFactory factory = new ConnectionFactory();
    			factory.setUsername("userName");
    			factory.setPassword("password");
    			factory.setHost("host");
    			factory.setVirtualHost("vhost");
    			factory.setPort(5672);
    			conn = factory.newConnection();
    			channel = conn.createChannel();
    			channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    			String message = "Hello Word!";
    			channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
    			System.out.println("RabbitMqPubTest send: " + message);
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			if (null != channel) {
    				try {
    					channel.close();
    				} catch (Exception e) {
    					e.printStackTrace();
    				}
    			}
    			if (null != conn) {
    				try {
    					conn.close();
    				} catch (Exception e) {
    					e.printStackTrace();
    				}
    			}
    		}
    	}
    }
    

    消费者

    public class RabbitMqSub01Test {
    	private static final String EXCHANGE_NAME = "test_exchange_fanout";
    
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory factory = new ConnectionFactory();
    		factory.setUsername("userName");
    		factory.setPassword("password");
    		factory.setHost("host");
    		factory.setVirtualHost("vhost");
    		factory.setPort(5672);
    		Connection conn = factory.newConnection();
    		Channel channel = conn.createChannel();
    		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    		String queueName = channel.queueDeclare().getQueue();
    		System.out.println("RabbitMqSub01Test queueName: " + queueName);
    		channel.queueBind(queueName, EXCHANGE_NAME, "");
    		DeliverCallback deliverCallback = new DeliverCallback() {
    			@Override public void handle(String s, Delivery delivery) throws IOException {
    				String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
    				System.out.println("RabbitMqSub01Test Received: " + message);
    			}
    		};
    		channel.basicConsume(queueName, true, deliverCallback, new CancelCallback() {
    			@Override public void handle(String s) throws IOException {
    
    			}
    		});
    	}
    }
    
    public class RabbitMqSub02Test {
    	private static final String EXCHANGE_NAME = "test_exchange_fanout";
    
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory factory = new ConnectionFactory();
    		factory.setUsername("userName");
    		factory.setPassword("password");
    		factory.setHost("host");
    		factory.setVirtualHost("vhost");
    		factory.setPort(5672);
    		Connection conn = factory.newConnection();
    		Channel channel = conn.createChannel();
    		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    		String queueName = channel.queueDeclare().getQueue();
    		System.out.println("RabbitMqSub02Test queueName: " + queueName);
    		channel.queueBind(queueName, EXCHANGE_NAME, "");
    		DeliverCallback deliverCallback = new DeliverCallback() {
    			@Override public void handle(String s, Delivery delivery) throws IOException {
    				String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
    				System.out.println("RabbitMqSub02Test Received: " + message);
    			}
    		};
    		channel.basicConsume(queueName, true, deliverCallback, new CancelCallback() {
    			@Override public void handle(String s) throws IOException {
    
    			}
    		});
    	}
    }
    

    以上是 java 常用 rabbitMQ API 操作,感谢观看!不足之处请评论!

  • 相关阅读:
    perl 调短信接口
    MySQL处理千万级数据查询、分页
    tomcat 设置path 访问路径
    Tomcat 设置内存大小
    Tomcat 80端口启动 必须是root
    tomcat server.xml配置解析
    Perl 发送邮件
    单身北漂生活二、三事(上)——北漂18年(8)
    tomcat 应用访问
    Perl 采集监控日志插入数据库
  • 原文地址:https://www.cnblogs.com/liufeichn/p/11961621.html
Copyright © 2011-2022 走看看