生产者:
public class Producer {
//默认连接用户名
public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//默认连接密码
public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//默认连接地址
public static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
ConnectionFactory connectionFactory = null; //连接工厂
Connection connection = null; //连接
Session session = null; //会话
Destination destination = null; //消息目的地
MessageProducer messageProducer; //消息生产者
//实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL);
try {
//通过连接工作获取连接
connection = connectionFactory.createConnection();
//启动连接
connection.start();
//创建会话,第一个参数表示是否使用事务,第二个参数表示消息的确认模式
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建一个名为DemoActiveMQ消息队列
destination = session.createTopic("DemoActiveMQ");
//创建消息生产者
messageProducer = session.createProducer(destination);
//发送消息
for (int i = 0; i < 3; i++) {
String msg = "发送第" + i + "条消息";
TextMessage textMessage = session.createTextMessage(msg);
messageProducer.send(textMessage);
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
消费者同步接收消息
public class Consumer {
//默认连接用户名
public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//默认连接密码
public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//默认连接地址
public static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
ConnectionFactory connectionFactory = null; //连接工厂
Connection connection = null; //连接
Session session = null; //会话
Destination destination = null; //消息目的地
MessageConsumer messageConsumer; //消息消费者
//实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL);
try {
//通过连接工作获取连接
connection = connectionFactory.createConnection();
//启动连接
connection.start();
//创建会话,第一个参数表示是否使用事务,第二个参数表示消息的确认模式
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建一个名为DemoActiveMQ消息队列
destination = session.createTopic("DemoActiveMQ");
//创建消息消费者
messageConsumer = session.createConsumer(destination);
//同步接受消息
Message message = null;
while ((message = messageConsumer.receive()) != null){
TextMessage textMessage = (TextMessage) message;
System.out.println(textMessage.getText());
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
if(connection != null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
消费者异步接收消息
public class Consumer {
//默认连接用户名
public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//默认连接密码
public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//默认连接地址
public static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
ConnectionFactory connectionFactory = null; //连接工厂
Connection connection = null; //连接
Session session = null; //会话
Destination destination = null; //消息目的地
MessageConsumer messageConsumer; //消息消费者
//实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL);
try {
//通过连接工作获取连接
connection = connectionFactory.createConnection();
//启动连接
connection.start();
//创建会话,第一个参数表示是否使用事务,第二个参数表示消息的确认模式
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建一个名为DemoActiveMQ消息队列
destination = session.createTopic("DemoActiveMQ");
//创建消息消费者
messageConsumer = session.createConsumer(destination);
//异步接收消息示例
messageConsumer.setMessageListener(new MessageListener(){
@Override
public void onMessage(Message message){
TextMessage textMessage = (TextMessage) message;
System.out.println(textMessage.getText());
});
}
}