zoukankan      html  css  js  c++  java
  • 小菜鸟学 MQ(三)

    创建程序测试MQ

    1,创建生产者

    package com.robert;
    import java.util.Hashtable;  
    import java.util.Map;  
      
    import javax.jms.Connection;  
    import javax.jms.ConnectionFactory;  
    import javax.jms.Destination;  
    import javax.jms.JMSException;  
    import javax.jms.MapMessage;  
    import javax.jms.Message;  
    import javax.jms.MessageProducer;  
    import javax.jms.Session;  
    
    import org.apache.activemq.ActiveMQConnectionFactory;  
    import org.apache.activemq.command.ActiveMQMapMessage;
    
    public class Publisher {
        
        protected int MAX_DELTA_PERCENT = 1;  
        protected Map<String, Double> LAST_PRICES = new Hashtable<String, Double>();  
        protected static int count = 10;  
        protected static int total;  
          
        protected static String brokerURL = "tcp://localhost:61616";  
        protected static transient ConnectionFactory factory;  
        protected transient Connection connection;  
        protected transient Session session;  
        protected transient MessageProducer producer;  
          
        public Publisher() throws JMSException {  
            factory = new ActiveMQConnectionFactory(brokerURL);  
            connection = factory.createConnection();  
            try {  
            connection.start();  
            } catch (JMSException jmse) {  
                connection.close();  
                throw jmse;  
            }  
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
            producer = session.createProducer(null);  
        }  
          
        public void close() throws JMSException {  
            if (connection != null) {  
                connection.close();  
            }  
        }  
          
        public static void main(String[] args) throws JMSException {  
            Publisher publisher = new Publisher();  
            while (total < 1000) {  
                for (int i = 0; i < count; i++) {  
                    publisher.sendMessage(args);  
                }  
                total += count;  
                System.out.println("Published '" + count + "' of '" + total + "' price messages");  
                try {  
                  Thread.sleep(1000);  
                } catch (InterruptedException x) {  
                }  
            }  
            publisher.close();  
        }  
      
        protected void sendMessage(String[] stocks) throws JMSException {  
            int idx = 0;  
            while (true) {  
                idx = (int)Math.round(stocks.length * Math.random());  
                if (idx < stocks.length) {  
                    break;  
                }  
            }  
            String stock = stocks[idx];  
            Destination destination = session.createTopic("STOCKS." + stock);  
            Message message = createStockMessage(stock, session);  
            System.out.println("Sending: " + ((ActiveMQMapMessage)message).getContentMap() + " on destination: " + destination);  
            producer.send(destination, message);  
        }  
      
        protected Message createStockMessage(String stock, Session session) throws JMSException {  
            Double value = LAST_PRICES.get(stock);  
            if (value == null) {  
                value = new Double(Math.random() * 100);  
            }  
      
            // lets mutate the value by some percentage  
            double oldPrice = value.doubleValue();  
            value = new Double(mutatePrice(oldPrice));  
            LAST_PRICES.put(stock, value);  
            double price = value.doubleValue();  
      
            double offer = price * 1.001;  
      
            boolean up = (price > oldPrice);  
      
            MapMessage message = session.createMapMessage();  
            message.setString("stock", stock);  
            message.setDouble("price", price);  
            message.setDouble("offer", offer);  
            message.setBoolean("up", up);  
            return message;  
        }  
      
        protected double mutatePrice(double price) {  
            double percentChange = (2 * Math.random() * MAX_DELTA_PERCENT) - MAX_DELTA_PERCENT;  
      
            return price * (100 + percentChange) / 100;  
        }  
    }  
    

    2,创建消费者

    package com.robert;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
      
    public class Consumer {  
      
        private static String brokerURL = "tcp://localhost:61616";  
        private static transient ConnectionFactory factory;  
        private transient Connection connection;  
        private transient Session session;  
          
        public Consumer() throws JMSException {  
            factory = new ActiveMQConnectionFactory(brokerURL);  
            connection = factory.createConnection();  
            connection.start();  
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
        }  
          
        public void close() throws JMSException {  
            if (connection != null) {  
                connection.close();  
            }  
        }      
          
        public static void main(String[] args) throws JMSException {  
            Consumer consumer = new Consumer();  
            for (String stock : args) {  
                Destination destination = consumer.getSession().createTopic("STOCKS." + stock);  
                MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination);  
                messageConsumer.setMessageListener(new Listener());  
            }  
        }  
          
        public Session getSession() {  
            return session;  
        } 
    }  
     
    
    

    3,创建消息监听

    package com.robert;
    
    import java.text.DecimalFormat;
    
    import javax.jms.MapMessage;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    
    public class Listener implements MessageListener {
    
        @Override
        public void onMessage(Message message) {  
            try {  
                MapMessage map = (MapMessage)message;  
                String stock = map.getString("stock");  
                double price = map.getDouble("price");  
                double offer = map.getDouble("offer");  
                boolean up = map.getBoolean("up");  
                DecimalFormat df = new DecimalFormat( "#,###,###,##0.00" );  
                System.out.println(stock + "	" + df.format(price) + "	" + df.format(offer) + "	" + (up?"up":"down"));  
            } catch (Exception e) {  
                e.printStackTrace();  
            }  
        }  
    }
    

    a,启动mq服务

    b,先启动 消费者程序 Consumer.java

    eclipse中添加运行参数如下:

    c,启动生产者,发消息。

    运行Publisher.java ,添加运行参数如上图所示。

    d,登录  http://localhost:8161/admin/topics.jsp

    运行如下所示:


    转自:http://hi.baidu.com/tuoxinquyu/item/1e9bcfd069641a332a35c73d

  • 相关阅读:
    解析Javascript事件冒泡机制
    LeetCode——Flatten Binary Tree to Linked List
    流动python
    HDU2586
    Cannot find ActionMappings or ActionFormBeans collection
    reactor设计模式
    简单的Ajax应用实例
    CString——Left、Right、Find、ReverseFind
    MATLAB新手教程
    八大排序算法总结
  • 原文地址:https://www.cnblogs.com/mengjianzhou/p/5986845.html
Copyright © 2011-2022 走看看