前期工作略去不表。
具体代码如下:
MessageReceiver
package jms.activemq.myexample;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MessageReceiver implements Runnable {
private String url;
private String user;
private String password;
private final String QUEUE;
public MessageReceiver(String queue, String url, String user, String password) {
this.url = url;
this.user = user;
this.password = password;
this.QUEUE = queue;
}
@Override
public void run() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
Session session = null;
Destination receiveQueue;
try {
Connection connection = connectionFactory.createConnection();
session = connection
.createSession(true, Session.SESSION_TRANSACTED);
receiveQueue = session.createQueue(QUEUE);
MessageConsumer consumer = session.createConsumer(receiveQueue);
connection.start();
while (true) {
Message message = consumer.receive();
if (message instanceof TextMessage) {
TextMessage receiveMessage = (TextMessage) message;
System.out.println("我是Receiver,收到消息如下: \r\n"
+ receiveMessage.getText());
} else {
session.commit();
break;
}
}
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}
MessageSender
package jms.activemq.myexample;
import java.util.Date;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消息发送器
* @author xiaochuanyu
*
*/
public class MessageSender implements Runnable {
private String url;
private String user;
private String password;
private final String QUEUE;
public MessageSender(String queue, String url, String user, String password) {
this.url = url;
this.user = user;
this.password = password;
this.QUEUE = queue;
}
@Override
public void run() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
Session session = null;
Destination sendQueue;
Connection connection = null;
int messageCount = 0;
try {
connection = connectionFactory.createConnection();
connection.start();
while (true) {
session = connection.createSession(true,
Session.SESSION_TRANSACTED);
sendQueue = session.createQueue(QUEUE);
MessageProducer sender = session.createProducer(sendQueue);
TextMessage outMessage = session.createTextMessage();
outMessage.setText(new Date() + "现在发送是第" + messageCount + "条消息");
sender.send(outMessage);
session.commit();
sender.close();
if ((++messageCount) == 10) {
// 发够十条消息退出
break;
}
Thread.sleep(1000);
}
connection.close();
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}
MyActiveMQDemo
package jms.activemq.myexample;
public class MyActiveMQDemo {
public static void main(String[] args) {
String url = "tcp://localhost:61616";
String user = null;
String password = null;
String query = "MyQueue";
new Thread(new MessageSender(query,url,user,password), "Name-Sender").start();
new Thread(new MessageReceiver(query,url,user,password), "Name-Receiver").start();
}
}