zoukankan      html  css  js  c++  java
  • ActiveMQ实现jms Publish/Subscribe实例

    MessagePublisher

    package jms.activemq.myexample;
    
    import java.util.Date;
    import javax.jms.Connection;
    import javax.jms.DeliveryMode;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.Topic;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class MessagePublisher implements Runnable {
    	private String url;
    	private String user;
    	private String password;
    	private String topicName;
    
    	public MessagePublisher(String topicName, String url, String user,
    			String password) {
    		this.url = url;
    		this.user = user;
    		this.password = password;
    		this.topicName = topicName;
    	}
    
    	@Override
    	public void run() {
    		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
    				user, password, url);
    		Session session = null;
    		MessageProducer sendPublisher;
    		Connection connection = null;
    
    		int messageCount = 0;
    		try {
    			connection = connectionFactory.createConnection();
    
    			connection.start();
    			
    			//创建Topic
    			//Topic topic = new ActiveMQTopic(this.topicName);
    			session = connection.createSession(false,
    					Session.AUTO_ACKNOWLEDGE);
    			Topic topic = session.createTopic(this.topicName);
    			sendPublisher = session.createProducer(topic);
    			while (true) {
    				String text = new Date() + "现在发送是第" + messageCount + "条消息";
    
    				sendPublisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    				sendPublisher.send(session.createTextMessage(text));
    
    				if ((++messageCount) == 3) {
    					// 发够十条消息退出
    					break;
    				}
    				Thread.sleep(1000);
    			}
    
    			//sendPublisher.close();
    
    			//connection.close();
    			
    			System.out.println("发布消息线程结束!!!!!!!!!!!!!!!!!!!!!!!!");
    		} catch (JMSException e) {
    			e.printStackTrace();
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
    
    	public String getUrl() {
    		return url;
    	}
    
    	public void setUrl(String url) {
    		this.url = url;
    	}
    
    	public String getUser() {
    		return user;
    	}
    
    	public void setUser(String user) {
    		this.user = user;
    	}
    
    	public String getPassword() {
    		return password;
    	}
    
    	public void setPassword(String password) {
    		this.password = password;
    	}
    
    	public String getTopic() {
    		return topicName;
    	}
    
    }
    

    MessageSubscriber

    package jms.activemq.myexample;
    
    import javax.jms.Connection;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.Topic;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class MessageSubscriber implements Runnable {
    	private String url;
    	private String user;
    	private String password;
    	private String topicName;
    
    	public MessageSubscriber(String topicName, String url, String user,
    			String password) {
    		this.url = url;
    		this.user = user;
    		this.password = password;
    		this.topicName = topicName;
    	}
    
    	@Override
    	public void run() {
    		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
    				user, password, url);
    		Session session = null;
    		MessageConsumer subscriber;
    		Connection connection = null;
    		try {
    			connection = connectionFactory.createConnection();
    
    			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
    			// 创建Topic
    			Topic topic = session.createTopic(this.topicName);
    
    			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    
    			subscriber = session.createConsumer(topic);
    			subscriber.setMessageListener(new TextListener());
    			connection.start();
    
    			System.out.println(Thread.currentThread().getName() + "开启");
    
    			// connection.close();
    		} catch (JMSException e) {
    			if (connection != null) {
    				try {
    					connection.close();
    				} catch (JMSException e1) {
    					// TODO Auto-generated catch block
    					e1.printStackTrace();
    				}
    			}
    		}
    	}
    
    	public String getUrl() {
    		return url;
    	}
    
    	public void setUrl(String url) {
    		this.url = url;
    	}
    
    	public String getUser() {
    		return user;
    	}
    
    	public void setUser(String user) {
    		this.user = user;
    	}
    
    	public String getPassword() {
    		return password;
    	}
    
    	public void setPassword(String password) {
    		this.password = password;
    	}
    
    	public String getTopic() {
    		return topicName;
    	}
    }
    

    TextListener

    package jms.activemq.myexample;
    
    
    import javax.jms.*;
    
    /**
     * Text消息监听
     * 
     * @author XXXX
     * */ public class TextListener implements MessageListener { /** * Casts the message to a TextMessage and displays its text. * * @param message * the incoming message */ public void onMessage(Message message) { TextMessage msg = null; try { if (message instanceof TextMessage) { msg = (TextMessage) message; System.out.println("Reading message: " + msg.getText()); } else { System.out.println("Message of wrong type: " + message.getClass().getName()); } } catch (JMSException e) { System.out.println("JMSException in onMessage(): " + e.toString()); } catch (Throwable t) { System.out.println("Exception in onMessage():" + t.getMessage()); } } }

    MyActiveMQDemo

    package jms.activemq.myexample;
    
    import javax.jms.JMSException;
    
    public class MyActiveMQDemo {
    	public static void main(String[] args) throws InterruptedException, JMSException {
    		String url = "tcp://localhost:61616";
    		String user = null;
    		String password = null;
    		String query = "MyQueueA";
    		String topic = "TestTopic";
    		
    		
    //		new Thread(new MessageSender(query,url,user,password), "Name-Sender").start();
    //		new Thread(new MessageReceiver(query,url,user,password), "Name-Receiver1").start();
    //		new Thread(new MessageReceiver(query,url,user,password), "Name-Receiver2").start();
    //		new Thread(new MessageReceiver(query,url,user,password), "Name-Receiver3").start();
    //		new Thread(new MessageReceiver(query,url,user,password), "Name-Receiver4").start();
    //		new Thread(new MessageReceiver(query,url,user,password), "Name-Receiver5").start();
    		
    		new Thread(new MessageSubscriber(topic,url,user,password), "Name-Subscriber1").start();
    		new Thread(new MessageSubscriber(topic,url,user,password), "Name-Subscriber2").start();
    		new Thread(new MessageSubscriber(topic,url,user,password), "Name-Subscriber3").start();
    		new Thread(new MessageSubscriber(topic,url,user,password), "Name-Subscriber4").start();
    		new Thread(new MessageSubscriber(topic,url,user,password), "Name-Subscriber5").start();
    		Thread.sleep(5000);
    		new Thread(new MessagePublisher(topic,url,user,password), "Name-Publisher").start();
    		//new TopicPublisher().run(); 
    	}
    }
    

  • 相关阅读:
    程序返回插入数据库成功,但是数据库内却没有数据
    C++ 使用动态二维数组参数
    深入理解.Net中的内存释放,以及有关的注意事项
    用数据集时,错误:未能启用约束。一行或多行中包含违反非空、唯一或外键约束的值
    关于堆和栈
    C#加密方法总汇
    const与readonly
    struts 将上传文件保存到数据库中
    java Annotation注解的运用
    转:获取汉字的拼音(包括一级和二级)
  • 原文地址:https://www.cnblogs.com/phoebus0501/p/1965276.html
Copyright © 2011-2022 走看看