队列结构图
P:消息的生产者
C:消息的消费者
红色:队列
生产者将消息发送到队列,消费者从队列中获取消息。
测试
1、连接MQ
public static Connection getConnection() throws Exception { //定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置服务地址 factory.setHost("192.168.116.137"); //端口 factory.setPort(5672); //设置账号信息,用户名、密码、vhost factory.setVirtualHost("/taotao"); factory.setUsername("taotao"); factory.setPassword("taotao"); // 通过工程获取连接 Connection connection = factory.newConnection(); return connection; }
2、生产者
private final static String QUEUE_NAME = "test_queue"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); // 从连接中创建通道 Channel channel = connection.createChannel(); // 声明(创建)队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 消息内容 String message = "Hello World 2!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); //关闭通道和连接 channel.close(); connection.close(); }
3、消费者
private final static String QUEUE_NAME = "test_queue"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列 channel.basicConsume(QUEUE_NAME, true, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } }
4、查看管理平台
从上图可以看到,消息已经创建。
5、总结
这种模式相对直接,生产者和消费者直接必须声明相同的队列,模型较为单一