zoukankan      html  css  js  c++  java
  • ActiveMQ专题1: 入门实例

    好久没有写博客了,最近真的是可以说是忙成狗了。项目的事和自己的终身大事忙得焦头烂额,好在是一切都是越来越好了......
    趁着项目今天唯一的一点喘息时间,加上项目开始接触到的mq,开始写一篇amq的入门专题
    

    AMQ入门实例

    下载导入源码:

    管理后台

    • 通过bin目录下的activemq.bat启动之后,就可以通过: http://localhost:8161/admin 来访问activemq的管理后台了
    • 默认的用户名和密码都是: admin. 用户名和密码配置在conf目录下的jetty-realm.properties文件中
    • 访问的端口配置在conf目录下的jetty.xml文件中

    入门使用实例

    1. 引入mq依赖

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.8.0</version>
    </dependency>
    

    2. 启动activemq服务

    3. 简单的服务端实现

    public class SimpleProducer {
    	public static void main(String[] args) {
    		// STEP1: 得到连接工厂
    		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
    		        ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);
    		
    		Connection connection = null;
    		Session session = null;
    		Destination destination = null;
    		MessageProducer producer = null;
    		MessageProducer topicProducer = null;
    		Destination topicDestination = null;
    		try {
    			// STEP2: 从连接工厂得到连接并且启动连接
    			connection = connectionFactory.createConnection();
    			connection.start();
    			
    			// STEP3: 获取会话
    			/**
    			 * 第一个参数表示是否开启事务:
    			 * 当第一个参数为true的时候,会忽略第二个参数,无论第二个参数为啥,都需要显示调用 session.commit() 消息才会提交到MQ
    			 * 当第一个参数为false的时候,第二个参数不能为:Session.SESSION_TRANSACTED。 且当第二个参数为其他合法值时,都不需要调用 session.commit(),消息都会发送到MQ
    			 * 第二个参数表示当未开启事务的时候,消费者或者客户端在什么时候发送确认消息
    			 */
    			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    			
    			// STEP4: 创建目标队列、主题 
    			/**
    			 * 队列和主题的区别在于:
    			 * 1、 队列是点对点的,队列中的消息只会被消费一次
    			 * 2、 主题类似于广播机制,只要订阅了该主题的消费者都可以对该消息进行消费
    			 * 3、 一般来说如果生产者在消费者启动之前创建了主题,那么消费者启动后接收不到主题。
    			 */
    			destination = session.createQueue("KiDe-Demo");
    			topicDestination = session.createTopic("KiDe-Demo");
    			
    			// STEP5: 创建消息生产者
    			producer = session.createProducer(destination);
    			topicProducer = session.createProducer(topicDestination);
    			
    			/**
    			 * 参数表示生产者发送的消息是否进行持久化
    			 */
    			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);		// 设置不持久化
    			topicProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);		// 设置不持久化 (不管最终设置的是持久化还是不持久化,只要生产者在消费者之前启动,主题消息都会丢失)
    			
    			// STEP6: 发送消息
    			for (int i=0; i<20; i++) {
    				TextMessage message = session.createTextMessage("Producer message:" + i);
    				producer.send(message);
    				topicProducer.send(message);
    			}
    			
    			// STEP7: 如果开启了事务 ,此时需要调用session提交操作
    			// session.commit();
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			if (connection != null) {
    				try {
    					connection.close();
    				} catch (JMSException e) {
    				}
    			}
    		}
    	}
    }
    

    简单消费者实现

    package com.rampage.learning.activemq;
    
    import java.util.concurrent.TimeUnit;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * 简单的队列消费者
     * 
     * @author ziyuqi
     *
     */
    public class SimpleConsumer {
    	public static void main(String[] args) {
    		// STEP1: 创建连接工厂
    		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
    		        ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);
    
    		Connection connection = null;
    		Session session = null;
    		Destination destination = null;
    		Destination topicDestination = null;
    		MessageConsumer consumer = null;
    		MessageConsumer topicConsumer = null;
    		try {
    			// STEP2: 从连接工厂得到连接并且启动连接
    			connection = connectionFactory.createConnection();
    			connection.start();
    
    			// STEP3: 获取会话
    			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
    			// STEP4: 创建目标队列
    			destination = session.createQueue("KiDe-Demo");
    			topicDestination = session.createTopic("KiDe-Demo");
    			
    			// STEP5: 创建消费者
    			consumer = session.createConsumer(destination);
    			topicConsumer = session.createConsumer(topicDestination);
    			
    			// STEP6: 设置消息接收者接收消息 也可以通过死循环接收消息
    			/*while (true) {
    				TextMessage textMessage = (TextMessage) consumer.receive(1000);
    				System.out.println(textMessage.getText());
    			}*/
    			consumer.setMessageListener(new MessageListener() {
    				
    				@Override
    				public void onMessage(Message paramMessage) {
    					TextMessage message = (TextMessage) paramMessage;
    					try {
    						System.out.println("消费者接收到队列消息:" + message.getText());
    					} catch (JMSException e) {
    						e.printStackTrace();
    					}
    				}
    			});
    			topicConsumer.setMessageListener(new MessageListener() {
    				
    				@Override
    				public void onMessage(Message paramMessage) {
    					TextMessage message = (TextMessage) paramMessage;
    					try {
    						System.out.println("消费者接收到主题消息:" + message.getText());
    					} catch (JMSException e) {
    						e.printStackTrace();
    					}
    				}
    			});
    			TimeUnit.SECONDS.sleep(200);	// 睡眠20秒,使得客户端可以接收到对应消息
    		} catch (Exception e) {
    
    		} finally {
    			if (connection != null) {
    				try {
    					connection.close();
    				} catch (JMSException e) {
    				}
    			}
    		}
    	}
    }
    
    

    代码说明

    ​ 从上面的代码可以看出,生产者和消费者的处理流程大致相同。存在很多重复代码,不难发现可以抽取出公共的代码来使得代码更加简洁。

    运行结果说明

    我这里运行了producer后,运行了两个consumer。不难发现,topic中的每条消息会被每个consumer完全消费,而queue中的消息,每一条消息只会被两个consumer中的一个消费。

  • 相关阅读:
    conda配置文件.condarc
    conda--python环境管理工具
    angular引入UEditor
    spark-MD5文件MD5加密
    js数组切片
    Window.postMessage() 解决父页面与iframe之间跨域通信问题,实时获取iframe消息动态
    博客园自定义主题
    html-标签转义-反转义
    uni-app获取通讯录信息 获取手机号
    LeetCode第243场周赛
  • 原文地址:https://www.cnblogs.com/Kidezyq/p/9556482.html
Copyright © 2011-2022 走看看