消息模式实例
视频教程:https://ke.qq.com/course/304104
编写代码前,最好先添加好用户并设置virtual hosts
一、简单模式
1.导入jar包
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.5.0</version> </dependency>
2.创建连接
import com.idelan.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Sender { private final static String QUEUE = "testhello"; //队列名字 public static void main(String[] args) throws Exception{ //获取连接 Connection connection = ConnectionUtil.getConnection(); //创建通道 Channel channel = connection.createChannel(); //声明队列,如果队列存在则什么都不做,如果队列不存在才创建 //参数一: 队列的名字 //参数二: 是否持久化队列,我们的队列模式是在内存中的,如果rabbit重启会丢失,如果我们设置为true 则会保存到erlng自带的数据库中,重启会重新获取 //参数三: 是否排外,有两个作用,第一个当我们的链接关闭后是否会自动删除队列,作用二,是否私有当前队列,如果私有了,其他通道不可以访问当前队列,如果为true 一般适合一个队列消费者的时候 //参数四: 是否自动删除 //参数五 我们的一些其他的参数 channel.queueDeclare(QUEUE, false, false, false, null); //发送内容 channel.basicPublish("", QUEUE, null, "hello world".getBytes()); //关闭连接 channel.close(); connection.close(); } }
3.消费者
import com.idelan.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Receiver { private final static String QUEUE = "testhello"; //队列名字 public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE, false, false, false, null); QueueingConsumer consumer = new QueueingConsumer(channel); //接收消息,参数二 是自动确认 channel.basicConsume(QUEUE, true, consumer); while (true) { //获取消息 如果没有消息会等待,有的话就获取执行然后销毁,是一次性的 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("message:"+message); } } }
二、工作模式
1.生产者
import com.idelan.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Sender { private final static String QUEUE = "testwork"; //队列名字 public static void main(String[] args) throws Exception{ //获取连接 Connection connection = ConnectionUtil.getConnection(); //创建通道 Channel channel = connection.createChannel(); //声明队列,如果队列存在则什么都不做,如果队列不存在才创建 //参数一: 队列的名字 //参数二: 是否持久化队列,我们的队列模式是在内存中的,如果rabbit重启会丢失,如果我们设置为true 则会保存到erlng自带的数据库中,重启会重新获取 //参数三: 是否排外,有两个作用,第一个当我们的链接关闭后是否会自动删除队列,作用二,是否私有当前队列,如果私有了,其他通道不可以访问当前队列,如果为true 一般适合一个队列消费者的时候 //参数四: 是否自动删除 //参数五 我们的一些其他的参数 channel.queueDeclare(QUEUE, false, false, false, null); for (int i = 0; i < 20; i++){ //发送内容 channel.basicPublish("", QUEUE, null, ("hello world "+i).getBytes()); } //关闭连接 channel.close(); connection.close(); } }
2.消费者1
import com.idelan.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Receiver1 { private final static String QUEUE = "testwork"; //队列名字 public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE, false, false, false, null); //告诉服务器,在我没有确认当前消息完成之前,不要给我发新消息 channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } //当我们收到消息的时候调用 System.out.println("消费者1 收到的消息内容是:" + new String(body)); //确认 参数2,false为确认收到消息,true 为拒绝接收 channel.basicAck(envelope.getDeliveryTag(), false); } }; //注册消费者,参数2 收到确认,代表我们收到消息后需要手动告诉服务器,我们收到消息了 channel.basicConsume(QUEUE, false, consumer); } }
3.消费者2
import com.idelan.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Receiver2 { private final static String QUEUE = "testwork"; //队列名字 public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE, false, false, false, null); //告诉服务器,在我没有确认当前消息完成之前,不要给我发新消息 channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //当我们收到消息的时候调用 System.out.println("消费者2 收到的消息内容是:" + new String(body)); //确认 参数2,false为确认收到消息,true 为拒绝接收 channel.basicAck(envelope.getDeliveryTag(), false); } }; //注册消费者,参数2 收到确认,代表我们收到消息后需要手动告诉服务器,我们收到消息了 channel.basicConsume(QUEUE, false, consumer); } }
三、发布订阅模式
1.生产者
import com.idelan.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Sender { private final static String EXCHANGE_NAME = "testexchange"; //定义交换机名字 public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//定义一个交换机,类型是fanout //发布订阅模式,因为消息是先发布到交换机中,而交换机是没有保存功能的,所以如果没有消费者,消息则会丢失 channel.basicPublish(EXCHANGE_NAME, "", null, "发布订阅模式的消息".getBytes()); channel.close(); connection.close(); } }
2.消费者1
import com.idelan.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Receiver1 { private final static String EXCHANGE_NAME = "testexchange"; //定义交换机名字 public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare("testpubQueue1", false, false, false, null); //绑定队列到交换机 channel.queueBind("testpubQueue1", EXCHANGE_NAME, ""); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1:"+new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume("testpubQueue1", false, consumer); } }
3.消费者2
import com.idelan.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Receiver2 { private final static String EXCHANGE_NAME = "testexchange"; //定义交换机名字 public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare("testpubQueue2", false, false, false, null); //绑定队列到交换机 channel.queueBind("testpubQueue2", EXCHANGE_NAME, ""); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2:"+new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume("testpubQueue2", false, consumer); } }
四、路由模式
1.生产者
import com.idelan.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Sender { private final static String EXCHANGE_NAME = "testexroute"; //定义交换机名字 public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "direct");//定义一个路由格式的交换机 //发布订阅模式,因为消息是先发布到交换机中,而交换机是没有保存功能的,所以如果没有消费者,消息则会丢失 // routingKey 为key1 channel.basicPublish(EXCHANGE_NAME, "key3", null, "路由模式的消息".getBytes()); channel.close(); connection.close(); } }
2.消费者1
import com.idelan.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Receiver1 { private final static String EXCHANGE_NAME = "testexroute"; //定义交换机名字 public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare("testRouteQueue1", false, false, false, null); //绑定队列到交换机 //参数3标记,绑定到交换机的时候会指定一个标记,只有和它一样的标记的消息才会被当前消费者接收到 channel.queueBind("testRouteQueue1", EXCHANGE_NAME, "key1"); //如果需要绑定多个标记 在执行一次即可 channel.queueBind("testRouteQueue1", EXCHANGE_NAME, "key3"); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1:"+new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume("testRouteQueue1", false, consumer); } }
3.消费者2
import com.idelan.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Receiver2 { private final static String EXCHANGE_NAME = "testexroute"; //定义交换机名字 public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare("testRouteQueue2", false, false, false, null); //绑定队列到交换机 //参数3标记,绑定到交换机的时候会指定一个标记,只有和它一样的标记的消息才会被当前消费者接收到 channel.queueBind("testRouteQueue2", EXCHANGE_NAME, "key1"); //如果需要绑定多个标记 在执行一次即可 channel.queueBind("testRouteQueue2", EXCHANGE_NAME, "key2"); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2:"+new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume("testRouteQueue2", false, consumer); } }
五、主题模式
1.生产者
import com.idelan.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Sender { private final static String EXCHANGE_NAME = "testexchangetopic"; //定义交换机名字 public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "topic");//定义一个topic 格式的交换机 //发布订阅模式,因为消息是先发布到交换机中,而交换机是没有保存功能的,所以如果没有消费者,消息则会丢失 // routingKey 为key1 // * 只能匹配一个字符 # 可以匹配多个字符 channel.basicPublish(EXCHANGE_NAME, "abc.1.3", null, "topic模式的消息".getBytes()); channel.close(); connection.close(); } }
2.消费者1
import com.idelan.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Receiver1 { private final static String EXCHANGE_NAME = "testexchangetopic"; //定义交换机名字 public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare("testTopicQueue1", false, false, false, null); //绑定队列到交换机 //参数3标记,绑定到交换机的时候会指定一个标记,只有和它一样的标记的消息才会被当前消费者接收到 channel.queueBind("testTopicQueue1", EXCHANGE_NAME, "key.*"); //如果需要绑定多个标记 在执行一次即可 channel.queueBind("testTopicQueue1", EXCHANGE_NAME, "abc.#"); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1:"+new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume("testTopicQueue1", false, consumer); } }
3.消费者2
import com.idelan.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Receiver2 { private final static String EXCHANGE_NAME = "testexchangetopic"; //定义交换机名字 public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare("testTopicQueue2", false, false, false, null); //绑定队列到交换机 //参数3标记,绑定到交换机的时候会指定一个标记,只有和它一样的标记的消息才会被当前消费者接收到 channel.queueBind("testTopicQueue2", EXCHANGE_NAME, "key.#"); //如果需要绑定多个标记 在执行一次即可 channel.queueBind("testTopicQueue2", EXCHANGE_NAME, "abc.*"); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2:"+new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume("testTopicQueue2", false, consumer); } }