zoukankan      html  css  js  c++  java
  • 消息队列:快速上手ActiveMQ消息队列的JMS方式使用(两种模式:Topic和Queue的消息推送和订阅)

    1、实现功能

    希望使用一套API,实现两种模式下的消息发送和接收功能,方便业务程序调用

    1、发送Topic

    2、发送Queue

    3、接收Topic

    4、接收Queue

    2、接口设计

    根据功能设计公共调用接口

    /**
     * 数据分发接口(用于发送、接收消息队列数据)
     * 
     * @author eguid
     *
     */
    public interface MsgDistributeInterface {
    
    	/**
    	 * 发送到主题
    	 * 
    	 * @param topicName -主题
    	 * @param data -数据
    	 * @return
    	 */
    	public boolean sendTopic(String topicName, byte[] data);
    	
    	/**
    	 * 发送到主题
    	 * @param topicName -主题
    	 * @param data-数据
    	 * @param offset -偏移量
    	 * @param length -长度
    	 * @return
    	 */
    	boolean sendTopic(String topicName, byte[] data, int offset, int length);
    
    	/**
    	 * 发送到队列
    	 * 
    	 * @param queueName -队列名称
    	 * @param data -数据
    	 * @return
    	 */
    	public boolean sendQueue(String queueName, byte[] data);
    
    	/**
    	 * 发送到队列
    	 * @param queueName -队列名称
    	 * @param data -数据
    	 * @param offset
    	 * @param length
    	 * @return
    	 */
    	public boolean sendQueue(String queueName, byte[] data,int offset, int length);
    
    	/**
    	 * 接收队列消息
    	 * @param queueName 队列名称
    	 * @param listener
    	 * @throws JMSException
    	 */
    	void receiveQueue(String queueName, MessageListener listener) throws JMSException;
    
    	/**
    	 * 订阅主题
    	 * @param topicName -主题名称
    	 * @param listener
    	 * @throws JMSException
    	 */
    	void receiveTopic(String topicName, MessageListener listener) throws JMSException;
    }

    3、基于ActiveMQ的接口实现

    /**
     * 基于activeMQ的消息生产者/消费者实现(初始化该对象时即初始化连接消息队列,如果无法连接到消息队列,立即抛出异常)
     * 
     * @author eguid
     *
     */
    public class ActiveMQImpl implements MsgDistributeInterface {
    
    	private String userName;
    	private String password;
    	private String brokerURL;
    	private boolean persistentMode;//持久化模式
    	//连接工厂
    	ConnectionFactory connectionFactory;
    	//发送消息的线程
    	Connection connection;
    	// 事务管理
    	Session session;
    
    	//存放各个线程订阅模式生产者
    	ThreadLocal<MessageProducer> topicThreadLocal = new ThreadLocal<MessageProducer>();
    	//存放各个线程队列模式生产者
    	ThreadLocal<MessageProducer> queueThreadLocal = new ThreadLocal<MessageProducer>();
    
    	public ActiveMQImpl(String userName, String password, String brokerURL) throws JMSException {
    		this(userName, password, brokerURL, true);
    	}
    	
    	public ActiveMQImpl(String userName, String password, String brokerURL,boolean persistentMode) throws JMSException {
    		this.userName = userName;
    		this.password = password;
    		this.brokerURL = brokerURL;
    		this.persistentMode=persistentMode;
    		init();
    	}
    
    	public void init() throws JMSException {
    		try {
    			// 创建一个链接工厂
    			connectionFactory = new ActiveMQConnectionFactory(this.userName, this.password, this.brokerURL);
    			// 从工厂中创建一个链接
    			connection = connectionFactory.createConnection();
    			// 开启链接
    			connection.start();
    			// 创建一个事务(订阅模式,事务采用自动确认方式)
    			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		} catch (JMSException e) {
    			throw e;
    		}
    	}
    
    	@Override
    	public boolean sendTopic(String topicName, byte[] data) {
    		return sendTopic(topicName, data, 0, data.length);
    	}
    
    	@Override
    	public boolean sendTopic(String topicName, byte[] data, int offset, int length) {
    		return send(true, topicName, data, offset, length);
    	}
    
    	@Override
    	public boolean sendQueue(String queueName, byte[] data) {
    		return sendQueue(queueName, data, 0, data.length);
    	}
    
    	@Override
    	public boolean sendQueue(String queueName, byte[] data, int offset, int length) {
    		return send(false, queueName, data, offset, length);
    	}
    
    	/**
    	 * 发送数据
    	 * 
    	 * @param name
    	 * @param data
    	 * @param offset
    	 * @param length
    	 * @param type
    	 *            -类型
    	 * @return
    	 */
    	private boolean send(boolean type, String name, byte[] data, int offset, int length) {
    		try {
    			MessageProducer messageProducer = getMessageProducer(name, type);
    			
    			BytesMessage msg = createBytesMsg(data, offset, length);
    			  System.err.println(Thread.currentThread().getName()+"发送消息");
    			// 发送消息
    			messageProducer.send(msg);
    		} catch (JMSException e) {
    			return false;
    		}
    		return false;
    	}
    	
    	public void receive(String topicName) throws JMSException {
    		final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 
    		Topic topic =session.createTopic(topicName);
    		MessageConsumer consumer=session.createConsumer(topic);
    		consumer.setMessageListener(new MessageListener() {
    			@Override
    			public void onMessage(Message message) {
    				BytesMessage msg=(BytesMessage) message;
    				System.err.println(Thread.currentThread().getName()+"收到消息:"+msg.toString());
    			}
    		});
    		
    	}
    	/**
    	 * 创建字节数组消息
    	 * 
    	 * @param data
    	 * @param offset
    	 * @param length
    	 * @return
    	 * @throws JMSException
    	 */
    	private BytesMessage createBytesMsg(byte[] data, int offset, int length) throws JMSException {
    		BytesMessage msg = session.createBytesMessage();
    		msg.writeBytes(data, offset, length);
    		return msg;
    	}
    	
    	/**
    	 * 创建对象序列化消息
    	 * @param obj
    	 * @return
    	 * @throws JMSException
    	 */
    	private ObjectMessage createMapMsg(Serializable obj) throws JMSException {
    //		MapMessage msg = session.createMapMessage();//key-value形式的消息
    		ObjectMessage msg = session.createObjectMessage(obj);
    		return msg;
    	}
    	
    	/**
    	 * 创建字符串消息
    	 * @param text
    	 * @return
    	 * @throws JMSException
    	 */
    	private TextMessage createTextMsg(String text) throws JMSException {
    		TextMessage msg = session.createTextMessage(text);
    		return msg;
    	}
    
    	
    	/**
    	 * 获取创建者
    	 * 
    	 * @param name -名称(主题名称和队列名称)
    	 * @param type -类型(true:topic,false:queue)
    	 * @return
    	 * @throws JMSException
    	 */
    	private MessageProducer getMessageProducer(String name, boolean type) throws JMSException {
    		return type?getTopicProducer(name):getQueueProducer(name);
    	}
    
    	/**
    	 * 创建或获取队列
    	 * @param queueName
    	 * @return
    	 * @throws JMSException
    	 */
    	private MessageProducer getQueueProducer(String queueName) throws JMSException {
    		MessageProducer messageProducer = null;
    		if ((messageProducer = queueThreadLocal.get()) == null) {
    			Queue queue = session.createQueue(queueName);
    			messageProducer = session.createProducer(queue);
    			//是否持久化(1-不持久化(如果没有消费者,消息就也会自动失效),2-持久化(如果没有消费者进行消费,消息队列也会缓存消息等待消费者进行消费))
    			messageProducer.setDeliveryMode(persistentMode?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);
    			queueThreadLocal.set(messageProducer);
    		}
    		return messageProducer;
    	}
    	
    	/**
    	 * 创建或获取主题
    	 * @param topicName
    	 * @return
    	 * @throws JMSException
    	 */
    	private MessageProducer getTopicProducer(String topicName) throws JMSException {
    		MessageProducer messageProducer = null;
    		if ((messageProducer = topicThreadLocal.get()) == null) {
    			Topic topic = session.createTopic(topicName);
    			messageProducer = session.createProducer(topic);
    			//是否持久化(1-不持久化(如果没有消费者,消息就也会自动失效),2-持久化(如果没有消费者进行消费,消息队列也会缓存消息等待消费者进行消费))
    			messageProducer.setDeliveryMode(persistentMode?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);
    			topicThreadLocal.set(messageProducer);
    		}
    		return  messageProducer;
    	}
    	
    	public String getPassword() {
    		return password;
    	}
    
    	public void setPassword(String password) {
    		this.password = password;
    	}
    
    	@Override
    	public void receiveQueue(String queueName,MessageListener listener) throws JMSException {
    		final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 
    		Queue topic =session.createQueue(queueName);
    		MessageConsumer consumer=session.createConsumer(topic);
    		consumer.setMessageListener(listener);
    		
    	}
    
    	@Override
    	public void receiveTopic(String topicName,MessageListener listener) throws JMSException {
    		final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 
    		Topic topic =session.createTopic(topicName);
    		MessageConsumer consumer=session.createConsumer(topic);
    		consumer.setMessageListener(listener);
    	}

    4、测试一下Topic和Queue

    public static void main(String[] args) throws JMSException{
    		//如果创建失败会立即抛出异常
    		MsgDistributeInterface  producter = new ActiveMQImpl("system", "manager", "tcp://127.0.0.1:61616");
            Test testMq = new Test();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //Thread 1
            new Thread(testMq.new ProductorMq(producter)).start();
            //Thread 2
            new Thread(testMq.new ProductorMq(producter)).start();
            //Thread 3
            new Thread(testMq.new ProductorMq(producter)).start();
            //Thread 4
            new Thread(testMq.new ProductorMq(producter)).start();
            //Thread 5
            new Thread(testMq.new ProductorMq(producter)).start();
            //Thread 6
            new Thread(testMq.new ProductorMq(producter)).start();
            
            //订阅接收线程Thread 1
            new Thread(new Runnable() {
    			@Override
    			public void run() {
    				try {
    					producter.receiveTopic("eguid-topic",new MessageListener() {
    						@Override
    						public void onMessage(Message message) {
    							BytesMessage msg=(BytesMessage) message;
    							System.err.println(Thread.currentThread().getName()+"订阅主题消息:"+msg.toString());
    						}
    					});
    				} catch (JMSException e) {
    					// TODO Auto-generated catch block
    					e.printStackTrace();
    				}
    			}
    		}).start();
            //订阅接收线程Thread 2
            new Thread(new Runnable() {
    			@Override
    			public void run() {
    				try {
    					producter.receiveTopic("eguid-topic",new MessageListener() {
    						@Override
    						public void onMessage(Message message) {
    							BytesMessage msg=(BytesMessage) message;
    							System.err.println(Thread.currentThread().getName()+"订阅主题消息:"+msg.toString());
    						}
    					});
    				} catch (JMSException e) {
    					// TODO Auto-generated catch block
    					e.printStackTrace();
    				}
    			}
    		}).start();
            //队列消息生产线程Thread-1
            new Thread(testMq.new  QueueProductor(producter)).start();
            //队列消息生产线程Thread-2
            new Thread(testMq.new  QueueProductor(producter)).start();
            //队列接收线程Thread 1
            new Thread(new Runnable() {
    			@Override
    			public void run() {
    				try {
    					producter.receiveQueue("eguid-queue",new MessageListener() {
    						@Override
    						public void onMessage(Message message) {
    							BytesMessage msg=(BytesMessage) message;
    							System.err.println(Thread.currentThread().getName()+"收到队列消息:"+msg.toString());
    						}
    					});
    				} catch (JMSException e) {
    					// TODO Auto-generated catch block
    					e.printStackTrace();
    				}
    			}
    		}).start();
          //队列接收线程Thread2
            new Thread(new Runnable() {
    			@Override
    			public void run() {
    				try {
    					producter.receiveQueue("eguid-queue",new MessageListener() {
    						@Override
    						public void onMessage(Message message) {
    							BytesMessage msg=(BytesMessage) message;
    							System.err.println(Thread.currentThread().getName()+"收到队列消息:"+msg.toString());
    						}
    					});
    				} catch (JMSException e) {
    					// TODO Auto-generated catch block
    					e.printStackTrace();
    				}
    			}
    		}).start();
        }
    
        private class ProductorMq implements Runnable{
        	Jtt809MsgProducter producter;
            public ProductorMq(Jtt809MsgProducter producter){
                this.producter = producter;
            }
    
            @Override
            public void run() {
                while(true){
                    try {
                    	String wang=Thread.currentThread().getName()+"Hello eguid! This is topic.";
                        producter.sendTopic("eguid-topic",wang.getBytes());
                        
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        
        private class QueueProductor implements Runnable{
        	Jtt809MsgProducter producter;
            public QueueProductor(Jtt809MsgProducter producter){
                this.producter = producter;
            }
    
            @Override
            public void run() {
                while(true){
                    try {
                    	String eguid=Thread.currentThread().getName()+"Hello eguid! This is queue.";
                        producter.sendQueue("eguid-queue",eguid.getBytes());
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        
    -------------------End--------------------



  • 相关阅读:
    extgcd 扩展欧几里得算法模板
    51nod 1073约瑟夫环
    UVA 11806 Cheerleaders (容斥原理
    HDU 1863 畅通工程 (最小生成树
    并查集模板
    51NOD 1072 威佐夫游戏
    Java基于JAX-RD开发Restful接口
    tomcat的webapps下放置多个项目时会出现很多exception
    带滚动条的表格
    禁止apache显示目录索引的常见方法
  • 原文地址:https://www.cnblogs.com/eguid/p/10195571.html
Copyright © 2011-2022 走看看