1.依赖
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <!-- 消息队列 --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.5.1</version> </dependency>
2.消息提供者
package com.jt.rabbitmq; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.junit.Test; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RabbitmqTest { @Test public void sendMsg() throws IOException{ //1 获取连接 ConnectionFactory conFactory = new ConnectionFactory(); conFactory.setVirtualHost("/jt"); conFactory.setHost("192.168.174.141"); conFactory.setPort(5672); conFactory.setUsername("jtadmin"); conFactory.setPassword("jtadmin"); Connection con = conFactory.newConnection(); //2 得到channel Channel channle = con.createChannel(); //3 创建队列 String queue="orderQueue4"; boolean durable=true;//队列会保存到硬盘 boolean exclusive=false;//别的程序也能访问 boolean autoDelete=false;//队列中的消息处理完了,不删除 HashMap<String, Object> arguments=null;//队列的配置数据 channle.queueDeclare(queue, durable, exclusive, autoDelete, arguments); //4 写数据 String msg="order4"; String exchange="";//默认使用default exchange String routingKey=queue;//使用队列名当做key // BasicProperties props=null;//消息是否序列到硬盘上 BasicProperties.Builder builder = new BasicProperties().builder(); //值为2,把消息保存到硬盘上 for(int i=1;i<=10;i++){ builder.deliveryMode(2); BasicProperties props = builder.build(); channle.basicPublish(exchange, routingKey, props, (msg+" "+i).getBytes()); } //5关闭连接 con.close(); // while (true) { // // // } } }
3.消息使用者
package com.jt.rabbitmq; import java.io.IOException; import java.util.HashMap; import org.junit.Test; import com.rabbitmq.client.AMQP.Queue.DeclareOk; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; import com.rabbitmq.client.ShutdownSignalException; public class RabbitmqConsumer { @Test public void getMsg() throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException{ //获取连接 ConnectionFactory factory = new ConnectionFactory(); factory.setVirtualHost("/jt"); factory.setHost("192.168.174.141"); factory.setPort(5672); factory.setUsername("jtadmin"); factory.setPassword("jtadmin"); Connection connection = factory.newConnection(); //获取通道 Channel channle = connection.createChannel(); //创建队列 String queue="orderQueue4"; boolean durable=true;//队列保存到硬盘 boolean exclusive=false;//别的程序也能访问 boolean autoDelete=false;//队列中的消息处理完了,不删除 HashMap<String, Object> arguments=null; channle.queueDeclare(queue, durable, exclusive, autoDelete, arguments); //读数据 QueueingConsumer consumer=new QueueingConsumer(channle); //自动确认,消费者收到消息,就自动给消息队列服务器发确认信息 //消息队列服务器就会删除信息。 boolean autoAck=false; //从orderQueue1队列取消息,消息放到consumser channle.basicConsume(queue, autoAck, consumer); //z启动了消费者 System.out.println("启动了消费者"); boolean isRunning=true; while(isRunning){ // Thread.currentThread().sleep(1500); Delivery delivery = consumer.nextDelivery(); //手动确认 long tag=delivery.getEnvelope().getDeliveryTag(); System.out.println(tag); byte[] body = delivery.getBody(); String reuslt=new String(body); //告诉队列已取到了数据 channle.basicAck(tag, true); System.out.println("result :"+reuslt); } } }
4.总结
在此模式下,消息未确认仍能接收消息,但服务器会显示消息未确认