zoukankan      html  css  js  c++  java
  • ActiveMQ第一个示例

    首先先安装ActiveMQ:https://www.cnblogs.com/hejianliang/p/9149590.html 

    创建Java项目,把 activemq-all-5.15.4.jar 包导入到项目。

    本次案例主要有两个角色,分别是 新闻发布者(NewsPublisher)、新闻订阅者(NewsSubscriber);发布者相当于 生产者,负责生产消息,订阅者相当于 消费者,负责接收消息。

    新闻发布者(NewsPublisher)

    package edu.activemq.publisher;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * 新闻发布者(生产者)
     * @author Administrator
     *
     */
    public class NewsPublisher {
    
    	/**
    	 * ActiveMQ连接用户名
    	 */
    	private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    	
    	/**
    	 * ActiveMQ连接密码
    	 */
    	private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    	
    	/**
    	 * ActiveMQ连接地址
    	 */
    	private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
    	
    	/**
    	 * 消息队列名称
    	 */
    	private static final String QUEUE_NAME = "MY_QUEUE";
    	
    	/**
    	 * 发布数量
    	 */
    	private static final Integer SEND_NUMBER = 5000;
    	
    	/**
    	 * 发布新闻
    	 */
    	public static void main(String[] args) {
    		// 声明连接工厂
    		ConnectionFactory connectionFactory;
    		
    		// 声明连接
    		Connection connection = null;
    		
    		// 声明会话,接收或者发送信息的线程
    		Session session;
    		
    		// 声明消息的目的地
    		Destination destination;
    		
    		// 声明消息生产者
    		MessageProducer messageProducer;
    		
    		try {
    			// 实例化连接工厂
    			connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
    			
    			// 通过连接工厂创建 连接
    			connection = connectionFactory.createConnection();
    			
    			// 启动连接
    			connection.start();
    			
    			// 创建session 参数1:开启事物  参数2:消息确认方式
    			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    			
    			// 创建消息队列
    			destination = session.createQueue(QUEUE_NAME);
    			
    			// 创建消息生产者
    			messageProducer = session.createProducer(destination);
    			
    			// 发送消息
    			sendMessage(session, messageProducer);
    			
    			// 提交事务
    			session.commit();
    		} catch (Exception e) {
    			System.out.println("发布新闻失败.");
    			e.printStackTrace();
    		} finally {
    			// 关闭资源
    			if (null != connection) {
    				try {
    					connection.close();
    				} catch (Exception e) {
    					System.out.println("关闭资源失败.");
    					e.printStackTrace();
    				}
    			}
    		}
    	}
    	
    	/**
    	 * 发送消息
    	 * @param session
    	 * @param messageProducer
    	 */
    	private static void sendMessage(Session session, MessageProducer messageProducer) {
    		// 声明消息对象
    		TextMessage textMessage;
    		
    		try {
    			for (int i = 0; i <= SEND_NUMBER; i++) {
    				// 创建消息
    				textMessage = session.createTextMessage("当前发送的新闻xxx, 编号为: " + i);
    				
    				// 发送消息
    				messageProducer.send(textMessage);
    			}
    		} catch (Exception e) {
    			System.out.println("发送新闻消息失败.");
    			e.printStackTrace();
    		}
    	}
    	
    }
    

    新闻订阅者(NewsSubscriber)

    package edu.activemq.subscriber;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import edu.activemq.listener.NewsSubscriberListener;
    
    /**
     * 新闻订阅者(消费者)
     * @author Administrator
     *
     */
    public class NewsSubscriber {
    
    	/**
    	 * ActiveMQ连接用户名
    	 */
    	private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    	
    	/**
    	 * ActiveMQ连接密码
    	 */
    	private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    	
    	/**
    	 * ActiveMQ连接地址
    	 */
    	private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
    	
    	/**
    	 * 消息队列名称
    	 */
    	private static final String QUEUE_NAME = "MY_QUEUE";
    	
    	public static void main(String[] args) {
    		// 声明连接工厂
    		ConnectionFactory connectionFactory;
    		
    		// 连接
    		Connection connection = null;
    		
    		// 会话
    		Session session;
    		
    		// 消息目的地
    		Destination destination;
    		
    		// 消息的消费者
    		MessageConsumer messageConsumer;
    		
    		// 实例化连接工厂
    		connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
    		try {
    			// 获取连接
    			connection = connectionFactory.createConnection();
    			// 启动连接
    			connection.start(); 
    			
    			// 获取Session
    			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    			
    			// 创建连接的消息队列
    			destination = session.createQueue(QUEUE_NAME);
    			 
    			// 创建消息的消费者
    			messageConsumer = session.createConsumer(destination);
    			
    			// 注册消息监听
    			messageConsumer.setMessageListener(new NewsSubscriberListener());
    			
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    	
    }
    

    新闻监听器(NewsSubscriberListener)

    package edu.activemq.listener;
    
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    /**
     * 新闻监听器
     * @author Administrator
     *
     */
    public class NewsSubscriberListener implements MessageListener {
    
    	@Override
    	public void onMessage(Message message) {
    		try {
    			// 创建文本消息对象
    			TextMessage textMessage = (TextMessage) message;
    			
    			// 输出监听到的消息
    			System.out.println("NewsSubscriberListener 监听到的消息: " + textMessage.getText());
    		} catch (Exception e) {
     			e.printStackTrace();
    		}
    	}
    
    }
    

      

    新闻发布者(NewsPublisher):负责生产消息,将消息发送到队列里面去。

    新闻订阅者(NewsSubscriber):负责消费消息,从消息队列里面取出消息。

    新闻监听器(NewsSubscriberListener):新闻订阅者的一个监听器,辅助订阅者监听队列的消息。

    运行结果:

    发布新闻:

    订阅者消费:

    运行后,队列里的消息已经被订阅者消费了。

  • 相关阅读:
    c# mvc action 跳转方式
    where T : new() 的含义
    HTML5 Dataset data-属性
    EF6 教程
    C写的AES(ECB/PKCS5Padding)
    【转载】openwrt: Makefile 框架分析
    SCP 命令【转】
    HTTP协议详解(转载)
    bootstrap模板
    拖拽插件
  • 原文地址:https://www.cnblogs.com/hejianliang/p/9151556.html
Copyright © 2011-2022 走看看