引言
它是一种完全按照routing key(路由关键字)进行投递的:当消息中的routing key和队列中的binding key完全匹配时,才进行会将消息投递到该队列中
1.模型
2.创建生产者
package com.dwz.rabbitmq.exchange.direct; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.dwz.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer { private static final String EXCHANGE_NAME = "test_direct_exchange"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); String routingKey_1 = "test.direct_1"; String routingKey_2 = "test.direct_2"; String msg_1 = "send rabbit direct message--1"; String msg_2 = "send rabbit direct message--2"; channel.basicPublish(EXCHANGE_NAME, routingKey_1, null, msg_1.getBytes()); channel.basicPublish(EXCHANGE_NAME, routingKey_2, null, msg_2.getBytes()); channel.close(); connection.close(); } }
3.创建消费者1
package com.dwz.rabbitmq.exchange.direct; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.dwz.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; public class Consumer { private static final String QUEUE_NAME = "test_direct_queue_1"; private static final String EXCHANGE_NAME = "test_direct_exchange"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); String routingKey = "test.direct_1"; channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, false, null); channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("rec direct_1 message:" + msg); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
4.创建消费者2
package com.dwz.rabbitmq.exchange.direct; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.dwz.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; public class Consumer2 { private static final String QUEUE_NAME = "test_direct_queue_2"; private static final String EXCHANGE_NAME = "test_direct_exchange"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); String routingKey = "test.direct_2"; channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, false, null); channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("rec direct_2 message:" + msg); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
5.运行代码
通过路由键 routingKey 得到指定信息,success!