1). ActiveMQ
ActiveMQ是Apache所提供的一个开源的消息系统,完全采用Java来实现,因此,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服务)规范。JMS是一组Java应用程序接口,它提供消息的创建、发送、读取等一系列服务。JMS提供了一组公共应用程序接口和响应的语法,类似于Java数据库的统一访问接口JDBC,它是一种与厂商无关的API,使得Java程序能够与不同厂商的消息组件很好地进行通信。ActiveMQ提供了许多高级功能,包括消息负载平衡和数据的高可用性。多个连接的“主”代理可以通过在后台节点之间移动消息来动态响应消费者需求。代理还可以在主从配置中配对在一起,这样,如果主服务器出现故障,则从服务器将接管确保客户端可以获取其重要数据并消除代价高昂的停机时间。
2).ActiveMQ的好处
- 应用解耦
- 流量消峰
- 异步处理
- 消息通讯
3).ActiveMQ的安装
3.1 ActiveMQ的官网网站http://activemq.apache.org/
3.2点击下载
3.3下载LINUX版本,并上传到虚拟机上,
3.4解压完成跳转到bin目录
3.5 ./activemq start 命令
启动,保持虚拟机的防火墙处于关闭状态
3.6 在本机上访问http://虚拟机ip : 默认端口号 8161
4).使用JAVA操作api
4.1引入Maven依赖
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all --> <!--ActiveMQ依赖包--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.9</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.xbean/xbean-spring --> <!--ActiveMQ和SPring整合包--> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>4.14</version> </dependency>
4.2消息的生产者
package com.yjc.activemq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 消息的生产者 * */ public class JMSProducer { //ActiveMQ的url private static final String ACTIVEMQ_URL="tcp://192.168.118.3:61616"; //Queue的名称 private static final String QUEUE_NAME="queue001"; public static void main(String[] args) throws JMSException { //1.创建ActiveMQ的连接工程,给定ActiveMQ的url,使用默认账号和密码 ActiveMQConnectionFactory activeFactory=new ActiveMQConnectionFactory(ACTIVEMQ_URL); //2.创建active的连接对象,抛出或捕获异常,并启动connection Connection connection = activeFactory.createConnection(); connection.start(); //3.创建会话对象,两个参数前者为事务,后者为签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4.创建目的地,目的地的类型为队列(Queue),参数为队列的名称 //Queue和Topic都继承自Destination接口 Queue queue = session.createQueue(QUEUE_NAME); //5.创建消息的生产者 MessageProducer producer = session.createProducer(queue); for (int i=1;i<=50;i++){ //6.通过session创建消息, TextMessage textMessage = session.createTextMessage("this is msg "+i); //7.用producer将消息发送至MQ的队列里 producer.send(textMessage); } //8.释放资源,先开后关 producer.close(); session.close(); connection.close(); System.out.println("将消息发送至MQ队列中成功!!!"); } }
4.3消息的消费者
package com.yjc.activemq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.io.IOException; public class JMSConsumer { //ActiveMQ的url private static final String ACTIVEMQ_URL="tcp://192.168.118.3:61616"; //Queue的名称 private static final String QUEUE_NAME="queue001"; public static void main(String[] args) throws JMSException, IOException { System.out.println("----------------------11111111111----------------------------"); //1.创建ActiveMQ的连接工程,给定ActiveMQ的url,使用默认账号和密码 ActiveMQConnectionFactory activeFactory=new ActiveMQConnectionFactory(ACTIVEMQ_URL); //2.创建active的连接对象,抛出或捕获异常,并启动connection Connection connection = activeFactory.createConnection(); connection.start(); //3.创建会话对象,两个参数前者为事务,后者为签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4.创建目的地,目的地的类型为队列(Queue),参数为队列的名称 //Queue和Topic都继承自Destination接口 Queue queue = session.createQueue(QUEUE_NAME); //----------------------上面的步骤和Producer的一样------------------------------------------------ //5.创建消费者 MessageConsumer consumer = session.createConsumer(queue); //6.消费消息 /* receive(long timeout)方法的重载为等待时间,不传参的话就会一直等待下去 消费的消息类型需要和生产出的消息类型保持一致 while(true){ TextMessage textMessage = (TextMessage) consumer.receive(3000); if (textMessage!=null) { System.out.println("-----消费信息"+textMessage.getText()); }else{ //代表所有的信息都以及被消费完了 break; } }*/ //6.2异步非阻塞方式(监听消息) consumer.setMessageListener(new MessageListener(){ @Override public void onMessage(Message message) { if (message!=null&&message instanceof TextMessage) { TextMessage message1 =(TextMessage)message; try { String text = message1.getText(); System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } } }); System.in.read(); //7.释放资源先开后关 consumer.close(); session.close(); connection.close(); System.out.println("消费完毕!!!"); } }
5).思考?
当有多个消息的消费者正在进行消息的监听,此时消息的生产者生产了n条信息,这些消息会怎样被消费者消费??
第一种情况:被最先启动的消费者全部消费
第二种情况:所有的消费者都可以消费所有的消息
第三种情况:按照顺序来进行分配消息
接下来我们测试一下,
第一步:按照先后顺序启动三个消费者并同时进行监听消息
此时ActiveMQ的管理中心如下图所示,只有三个消费者在等待消息(监听),而没有一条消息
第二步:让生产者生产出50条消息
当生产者将消息放入到消息队列当中之后,之前创建的三个消费者监听到了有消息的存在,消费者要对消息进行消费,运行结果如下
由上面的测试可知,运行的结果是之前猜测的第三种情况,消费者按照消费者创建的顺序,也就是先来的先消费,后来的要排队,等待前一个消费者消费完一条消息之后,他才能进行消费,
一个消费者消费一条消息的顺序来进行循环消费,当第二个消费者将这最后一条消息消费完之后,消息队列中已经没有消息了,所以第三个消费者会比前面两个消费者少消费了一条消息。
但是,当此时再有新的消息进入到队列中的时候,就会轮到第三个消费者先进行消费。
如果此时又有一个刚来的消费者,也就是第四个消费者,第四个消费者就会跟着他前一个后面进行消费消息,并加入他们的循环之中
以上就是本次初学ActiveMQ的理解,如有不足支持请大家指出,谢谢