记录rabbitmq的简单使用。其中记录了消息的确认与拒绝。
1.产生消息:
public void produce() {
String queueName = "test";
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection con = null;
Channel channel = null;
try {
con = factory.newConnection();
channel = con.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
String msg = "hello rabbit2!";
channel.basicPublish("", queueName, null, msg.getBytes());
System.out.println("send msg : " + msg);
} catch (IOException e) {
e.printStackTrace();
} finally {
close(channel)
}
}
2.消费消息:
public void consume() {
String queueName = "test";
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection con = null;
Channel channel = null;
try {
con = factory.newConnection();
channel = con.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
QueueingConsumer cons = new QueueingConsumer(channel);
//RabbitMQ客户端接受消息最大数量;如果没有设置会一次消费所有消息
channel.basicQos(0,1,false);
// 向channel中写入属性需要在一下这句程序之前才能起效
// 当要设置一次消费的最大数量和消息回置时,需要将第二个参数设置为false--代表是否自动ack
channel.basicConsume(queueName, false, cons);
while (true) {
Delivery deliver = cons.nextDelivery();
String msg = new String(deliver.getBody());
// if (msg.equals("hello")) {
// //拒绝消息(如果最后一个设置为true,此消息还会存在)
// channel.basicNack(deliver.getEnvelope().getDeliveryTag() , false, true);
// }
// //确认消息,已经收到
// else {
// channel.basicAck(deliver.getEnvelope().getDeliveryTag()
// , false);
// }
// 当autoAck为false时,就需要手动ack
channel.basicAck(deliver.getEnvelope().getDeliveryTag(), false);
System.out.println("recive: " + msg);
}
} catch (IOException e) {
e.printStackTrace();
} catch (ShutdownSignalException e) {
e.printStackTrace();
} catch (ConsumerCancelledException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
close(channel);
}
}
3.产生于消费公共方法,关闭链接:
private void close(Channel channel) {
Connection connection = null;
try {
connection = channel.getConnection();
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}