1.连接抽象封装
package com.pccw.rabbitmq; import com.rabbitmq.client.Channel; import java.io.IOException; import com.rabbitmq.client.*; public abstract class EndPoint { protected Channel channel; protected Connection connection; protected String endPointName; public EndPoint(String endpointName) throws IOException { this.endPointName = endpointName; // Create a connection factory ConnectionFactory factory = new ConnectionFactory(); // hostname of your rabbitmq server factory.setHost("192.168.220.132"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); connection = factory.newConnection(); // creating a channel channel = connection.createChannel(); // declaring a queue for this channel. If queue does not exist, // it will be created on the server. channel.queueDeclare(endpointName, false, false, false, null); } public void close() throws IOException { this.channel.close(); this.connection.close(); } }
2.生产者
package com.pccw.rabbitmq; import java.io.IOException; import java.io.Serializable; import org.apache.commons.lang.SerializationUtils; public class Producer extends EndPoint{ public Producer(String endPointName) throws IOException{ super(endPointName); } public void sendMessage(Serializable object) throws IOException { channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object)); } }
3.消费者
package com.pccw.rabbitmq; import java.io.IOException; import java.util.*; import org.apache.commons.lang.SerializationUtils; import com.rabbitmq.client.*; public class QueueConsumer extends EndPoint implements Runnable, Consumer { public QueueConsumer(String endPointName) throws IOException { super(endPointName); } public void run() { try { // start consuming messages. Auto acknowledge messages. channel.basicConsume(endPointName, true, this); } catch (IOException e) { e.printStackTrace(); } } /** * Called when consumer is registered. */ public void handleConsumeOk(String consumerTag) { System.out.println("Consumer " + consumerTag + " registered"); } /** * Called when new message is available. */ public void handleDelivery(String arg0, Envelope arg1, com.rabbitmq.client.AMQP.BasicProperties arg2, byte[] body) throws IOException { Map map = (HashMap) SerializationUtils.deserialize(body); System.out.println(map.get("message number")); } public void handleCancel(String consumerTag) { } public void handleCancelOk(String consumerTag) { } public void handleRecoverOk(String consumerTag) { } public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) { } }
4.生产者测试类
package com.pccw.rabbitmq; import java.util.HashMap; public class MainProducer { public MainProducer() throws Exception{ Producer producer = new Producer("queue"); for (int i = 0; i < 1; i++) { HashMap message = new HashMap(); message.put("message number", "中国"+i); producer.sendMessage(message); System.out.println("Message Number "+ i +" sent."); } producer.close(); } public static void main(String[] args) throws Exception{ new MainProducer(); } }
5.消费者测试类
package com.pccw.rabbitmq; public class MainConsumer { public MainConsumer() throws Exception{ QueueConsumer consumer = new QueueConsumer("queue"); Thread consumerThread = new Thread(consumer); consumerThread.start(); } public static void main(String[] args) throws Exception{ new MainConsumer(); } }