一、 交换器
RabbitMQ交换器(Exchange)分为四种
- direct
- fanout
- topic
- headers
- direct
默认的交换器类型,消息的RoutingKey与队列的bindingKey匹配,消息就投递到相应的队列
- fanout
一种发布/订阅模式的交换器,发布一条消息时,fanout把消息广播附加到fanout交换器的队列上
接收类(订阅):
import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//一旦创建exchange,RabbitMQ不允许对其改变,否则报错 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "");//绑定是交换器与队列之间的关系,可以理解为,队列对此交换器的消息感兴趣 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
发布类:
import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class ReceiveLog { private static final String EXCHANGE_NAME = "log"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = "hi"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
- topic
topic类似于fanout交换器,但更加具体化,用routingKey进行规则匹配,更灵活的匹配出用户想要接收的消息
routingKey形如:com.company.module.demo,具体匹配规则:
"*"与"#"可以匹配任意字符,区别是"*"只能匹配由"."分割的一段字符,而"#"可以匹配所有字符
发布一条"com.abc.test.push"的消息,能匹配的routingKey:
com.abc.test.*
#.test.push
#
不能匹配的:
com.abc.*
*.test.push
*
发布类:
声明队列时,需要注意队列的属性,虽然队列的声明由消费者或生产者完成都可以,但如果由消费者声明,由于生产者生产消息时,可能队列还没有声明,会造成消息丢失,所以推荐由生产者声明队列
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; public class RabbitMqSendTest { private static String queue = "test_queue"; private static String exchange = "TestExchange"; private static String routingKey = "abc.test"; public static void main(String[] args) { ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory(); factory.setHost("172.16.67.60"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); Connection mqConnection = null; try { mqConnection = factory.newConnection(); Channel mqChannel = mqConnection.createChannel(); if (null != mqChannel && mqChannel.isOpen()) { mqChannel.exchangeDeclare(exchange, "topic"); // String queueName = mqChannel.queueDeclare().getQueue(); // mqChannel.queueBind(queueName, exchange, routingKey); //声明队列名称与属性 //durable持久队列,mq重启队列可恢复 exclusive独占队列,仅限于声明它的连接使用操作 //autoDelete 自动删除 arguments 其他属性 mqChannel.queueDeclare(queue, false, false, false, null); mqChannel.queueBind(queue, exchange, routingKey); //******************************************* mqChannel.basicPublish(exchange, routingKey, null, ("hello").getBytes()); } } catch (Exception e) { e.printStackTrace(); }finally { try { mqConnection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
接收类
import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveTopic { private static String queue = "consume_queue"; private static String exchange = "TestExchange"; private static String routingKey = "*.test"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("172.16.67.60"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // channel声明Exchange,名称与类型 channel.exchangeDeclare(exchange, "topic"); // String queuename = channel.queueDeclare().getQueue(); channel.queueDeclare(queue, false, false, false, null); channel.queueBind(queue, exchange, "*.test"); //消费者指定消息队列,并选择特定的RoutingKey System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer client = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException { String msgString = new String(body, "UTF-8"); System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + msgString + "'"); } }; channel.basicConsume(queue, true,client); System.out.println(); } }
二、持久化
RabbitMQ默认情况下重启消息服务器时,会丢失消息,为了尽量保证消息在服务器宕机时不丢失,就需要把消息持久化,但是也只是尽量不丢失,由于涉及磁盘写入,当消息量巨大时,mq性能也会被严重拉低。