Producer端
1、channel的创建
无论是才用什么样的Exchange,创建channel代码都是相同的,如下
1 ConnectionFactory factory = new ConnectionFactory(); 2 factory.setHost("localhost"); 3 Connection connection = factory.newConnection(); 4 Channel channel = connection.createChannel();
2、Exchange的创建
2.1 direct
direct使用默认的Exchange,不需要声明,单需要指定消息发送到那个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
2.2 fanout
channel.exchangeDeclare(EXCHANGE_NAME, "fanout")
2.3 topic如下
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
3、消息的发送
3.1 direct
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
3.2 fanout
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
3.3 topic
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
cusumer端
1、创建channel
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
2、消息前的准备
2.1 direct直接绑定队列进行消息的消费
chanel.queueDeclare(QUEUE_NAME, false, false, false, null);
2.2 fanout,需要先指定exchange类型为fanout
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "");
2.3 topic
channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); for(String bindingKey : argv){ channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); }
3、具体消费消息的代码是一样的
QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); Envelope elp = delivery.getEnvelope(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); channel.basicAck(elp.getDeliveryTag(), false); //channel.basicNack(); }