记录rabbitmq的简单使用。其中记录了消息的确认与拒绝。
1.产生消息:
public void produce() { String queueName = "test"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection con = null; Channel channel = null; try { con = factory.newConnection(); channel = con.createChannel(); channel.queueDeclare(queueName, false, false, false, null); String msg = "hello rabbit2!"; channel.basicPublish("", queueName, null, msg.getBytes()); System.out.println("send msg : " + msg); } catch (IOException e) { e.printStackTrace(); } finally { close(channel) } }
2.消费消息:
public void consume() { String queueName = "test"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection con = null; Channel channel = null; try { con = factory.newConnection(); channel = con.createChannel(); channel.queueDeclare(queueName, false, false, false, null); QueueingConsumer cons = new QueueingConsumer(channel); //RabbitMQ客户端接受消息最大数量;如果没有设置会一次消费所有消息 channel.basicQos(0,1,false); // 向channel中写入属性需要在一下这句程序之前才能起效 // 当要设置一次消费的最大数量和消息回置时,需要将第二个参数设置为false--代表是否自动ack channel.basicConsume(queueName, false, cons); while (true) { Delivery deliver = cons.nextDelivery(); String msg = new String(deliver.getBody()); // if (msg.equals("hello")) { // //拒绝消息(如果最后一个设置为true,此消息还会存在) // channel.basicNack(deliver.getEnvelope().getDeliveryTag() , false, true); // } // //确认消息,已经收到 // else { // channel.basicAck(deliver.getEnvelope().getDeliveryTag() // , false); // } // 当autoAck为false时,就需要手动ack channel.basicAck(deliver.getEnvelope().getDeliveryTag(), false); System.out.println("recive: " + msg); } } catch (IOException e) { e.printStackTrace(); } catch (ShutdownSignalException e) { e.printStackTrace(); } catch (ConsumerCancelledException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { close(channel); } }
3.产生于消费公共方法,关闭链接:
private void close(Channel channel) { Connection connection = null; try { connection = channel.getConnection(); channel.close(); } catch (IOException e) { e.printStackTrace(); } try { connection.close(); } catch (IOException e) { e.printStackTrace(); } }