消息队列的作用
为什么使用ActiveMQ,不使用其他工具
下载安装包并启动
http://localhost:8161/admin/ (账号:admin:admin)
Java实现步骤:
// 1.创建连接工厂对象(ConnectionFactory) ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.创建连接对象(Connection) Connection connection = connectionFactory.createConnection(); // 3.启动连接 connection.start(); // 4.创建session会话,第一参数表示启用事务处理,第二个参数表示启动哪种应答模式,这里启用的是自动应答 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5.创建目的地(queue/topic) Queue queue = session.createQueue(QUEUE_NAME); // 6.创建生产者/消费者 MessageProducer producer = session.createProducer(queue); // 7.生产/消费消息 producer.send(message);
生产者代码:
public class MQProducer { private static final String URL = "tcp://localhost:61616"; private static final String QUEUE_NAME = "queue-test"; private static final String TOPIC_NAME = "topic-test"; public static void main(String[] args) throws JMSException { ConnectionFactory ConnectionFactory = new ActiveMQConnectionFactory(URL); Connection connection = ConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue destination = session.createQueue(QUEUE_NAME); // Topic destination = session.createTopic(TOPIC_NAME); // Topic消息 MessageProducer producer = session.createProducer(destination); for(int i = 0; i < 10; i++) { TextMessage message = session.createTextMessage("Message" + i); producer.send(message); System.out.println("Sent message" + i); } producer.close(); session.close(); connection.close(); } }
消费者代码:
public class MQConsumer { private static final String URL = "tcp://localhost:61616"; private static final String QUEUE_NAME = "queue-test"; private static final String TOPIC_NAME = "topic-test"; public static void main(String[] args) throws JMSException, InterruptedException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); Connection connection = connectionFactory.createConnection(); // 消费者1 : Queue & Topic Consumer connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue destination = session.createQueue(QUEUE_NAME); // Topic destination = session.createTopic(TOPIC_NAME); MessageConsumer consumer = session.createConsumer(destination); // 消费者2 : Topic Subscriber // connection.setClientID("client-test"); // connection.start(); // Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Topic destination = session.createTopic(TOPIC_NAME); // TopicSubscriber consumer = session.createDurableSubscriber(destination, "subscription-test"); // 接收消息1 : MessageListener异步接收消息 consumer.setMessageListener(new MessageListener(){ @Override public void onMessage(Message message) { try { System.out.println("Received " + ((TextMessage) message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }); Thread.sleep(10000); // 主线程等待一段时间后结束 // 接收消息2 : receive同步阻塞等待消息 // for(int i = 0; i < 10; i++) { // TextMessage message = (TextMessage) consumer.receive(); // System.out.println("Received " + message.getText()); // } consumer.close(); session.close(); connection.close(); } }
Producer consumer receive messageListener
topic
queue
JMS
消息可靠性之持久化 JDBC
topic持久化
事务
签收
点对点发布订阅
spring整合activemq
zookeeper+replicated levelDB
异步投递
延迟投递
定时投递
重试机制
死信队列
防止重复调用