模型
graph LR
生产者 -->id[queue]
id[queue]--> 消费者
获取连接
public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/vhost_ct");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory.newConnection();
}
生产者Producer
public class Send {
private static final String QUEUE_NAME = "test_simple_queue";
public static void main(String[] a) throws IOException, TimeoutException {
//获取一个链接
Connection connection = ConnectionUtil.getConnection();
//从链接中获取一个通道
Channel channel = connection.createChannel();
//创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "hello_simple";
//发布
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("send:" + msg);
channel.close();
connection.close();
}
}
消费者 Consumer
public class Recv {
private static final String QUEUE_NAME = "test_simple_queue";
public static void main(String[] a) throws IOException, TimeoutException {
//获取一个链接
Connection connection = ConnectionUtil.getConnection();
//从链接中获取一个通道
Channel channel = connection.createChannel();
//定义队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
//获取到达的消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println(msg);
}
};
//监听队列
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
不足之处
耦合性较高,生产者和消费者一一对应,修改队列名,生产者和消费者需同时修改