zoukankan      html  css  js  c++  java
  • [收藏]AMQ经典文章

    企业中各项目中相互协作的时候可能用得到消息通知机制。比如有东西更新了,可以通知做索引。

    在 Java 里有 JMS 的多个实现。其中 apache 下的 ActiveMQ 就是不错的选择。ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。这里示例下使用 ActiveMQ

    用 ActiveMQ 最好还是了解下 JMS

    JMS 公共 点对点域 发布/订阅域
    ConnectionFactory QueueConnectionFactory TopicConnectionFactory
    Connection QueueConnection TopicConnection
    Destination Queue Topic
    Session QueueSession TopicSession
    MessageProducer QueueSender TopicPublisher
    MessageConsumer QueueReceiver TopicSubscriber

    JMS 定义了两种方式:Quere(点对点);Topic(发布/订阅)。

    ConnectionFactory 是连接工厂,负责创建Connection。

    Connection 负责创建 Session。

    Session 创建 MessageProducer(用来发消息) 和 MessageConsumer(用来接收消息)。

    Destination 是消息的目的地。

    详细的可以网上找些 JMS 规范(有中文版)。

    下载 apache-activemq-5.3.0。http://activemq.apache.org/download.html ,解压,然后双击 bin/activemq.bat。运行后,可以在 http://localhost:8161/admin 观察。也有 demo, http://localhost:8161/demo 。把 activemq-all-5.3.0.jar 加入 classpath。

    Jms 发送 代码:

    public static void main(String[] args) throws Exception {   
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();   
      
        Connection connection = connectionFactory.createConnection();   
        connection.start();   
      
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);   
        Destination destination = session.createQueue("my-queue");   
      
        MessageProducer producer = session.createProducer(destination);   
        for(int i=0; i<3; i++) {   
            MapMessage message = session.createMapMessage();   
            message.setLong("count", new Date().getTime());   
            Thread.sleep(1000);   
            //通过消息生产者发出消息   
            producer.send(message);   
        }   
        session.commit();   
        session.close();   
        connection.close();   
    }



    Jms 接收代码:


    public static void main(String[] args) throws Exception {   
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();   
      
        Connection connection = connectionFactory.createConnection();   
        connection.start();   
      
        final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);   
        Destination destination = session.createQueue("my-queue");   
      
        MessageConsumer consumer = session.createConsumer(destination);   
        int i=0;   
        while(i<3) {   
            i++;   
            MapMessage message = (MapMessage) consumer.receive();   
            session.commit();   
      
            //TODO something....   
            System.out.println("收到消息:" + new Date(message.getLong("count")));   
        }   
      
        session.close();   
        connection.close();   
    }



    JMS五种消息的发送/接收的例子

    转自:http://chenjumin.javaeye.com/blog/687124  

    1、消息发送

    //连接工厂  
    ConnectionFactory connFactory = new ActiveMQConnectionFactory(  
            ActiveMQConnection.DEFAULT_USER,  
            ActiveMQConnection.DEFAULT_PASSWORD,  
            "tcp://localhost:61616");  
     
    //连接到JMS提供者  
    Connection conn = connFactory.createConnection();  
    conn.start();  
     
    //事务性会话,自动确认消息  
    Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);  
     
    //消息的目的地  
    Destination destination = session.createQueue("queue.hello");  
     
    //消息生产者  
    MessageProducer producer = session.createProducer(destination);  
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //不持久化  
     
     
    //文本消息  
    TextMessage textMessage = session.createTextMessage("文本消息");  
    producer.send(textMessage);  
     
    //键值对消息  
    MapMessage mapMessage = session.createMapMessage();  
    mapMessage.setLong("age", new Long(32));  
    mapMessage.setDouble("sarray", new Double(5867.15));  
    mapMessage.setString("username", "键值对消息");  
    producer.send(mapMessage);  
     
    //流消息  
    StreamMessage streamMessage = session.createStreamMessage();  
    streamMessage.writeString("streamMessage流消息");  
    streamMessage.writeLong(55);  
    producer.send(streamMessage);  
     
    //字节消息  
    String s = "BytesMessage字节消息";  
    BytesMessage bytesMessage = session.createBytesMessage();  
    bytesMessage.writeBytes(s.getBytes());  
    producer.send(bytesMessage);  
     
    //对象消息  
    User user = new User("cjm", "对象消息"); //User对象必须实现Serializable接口  
    ObjectMessage objectMessage = session.createObjectMessage();  
    objectMessage.setObject(user);  
    producer.send(objectMessage);  
     
     
    session.commit(); //在事务性会话中,只有commit之后,消息才会真正到达目的地  
    producer.close();  
    session.close();  
    conn.close(); 



    2、消息接收:通过消息监听器的方式接收消息


    public class Receiver implements MessageListener{  
        private boolean stop = false;  
          
        public void execute() throws Exception {  
            //连接工厂  
            ConnectionFactory connFactory = new ActiveMQConnectionFactory(  
                    ActiveMQConnection.DEFAULT_USER,  
                    ActiveMQConnection.DEFAULT_PASSWORD,  
                    "tcp://localhost:61616");  
              
            //连接到JMS提供者  
            Connection conn = connFactory.createConnection();  
            conn.start();  
              
            //事务性会话,自动确认消息  
            Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);  
              
            //消息的来源地  
            Destination destination = session.createQueue("queue.hello");  
              
            //消息消费者  
            MessageConsumer consumer = session.createConsumer(destination);  
            consumer.setMessageListener(this);  
              
            //等待接收消息  
            while(!stop){  
                Thread.sleep(5000);  
            }  
              
            session.commit();  
              
            consumer.close();  
            session.close();  
            conn.close();  
        }  
     
        public void onMessage(Message m) {  
            try{  
                if(m instanceof TextMessage){ //接收文本消息  
                    TextMessage message = (TextMessage)m;  
                    System.out.println(message.getText());  
                }else if(m instanceof MapMessage){ //接收键值对消息  
                    MapMessage message = (MapMessage)m;  
                    System.out.println(message.getLong("age"));  
                    System.out.println(message.getDouble("sarray"));  
                    System.out.println(message.getString("username"));  
                }else if(m instanceof StreamMessage){ //接收流消息  
                    StreamMessage message = (StreamMessage)m;  
                    System.out.println(message.readString());  
                    System.out.println(message.readLong());  
                }else if(m instanceof BytesMessage){ //接收字节消息  
                    byte[] b = new byte[1024];  
                    int len = -1;  
                    BytesMessage message = (BytesMessage)m;  
                    while((len=message.readBytes(b))!=-1){  
                        System.out.println(new String(b, 0, len));  
                    }  
                }else if(m instanceof ObjectMessage){ //接收对象消息  
                    ObjectMessage message = (ObjectMessage)m;  
                    User user = (User)message.getObject();  
                    System.out.println(user.getUsername() + " _ " + user.getPassword());  
                }else{  
                    System.out.println(m);  
                }  
                  
                stop = true;  
            }catch(JMSException e){  
                stop = true;  
                e.printStackTrace();  
            }  
        }  

  • 相关阅读:
    Django中的syncdb命令
    notepad++下的字体设置
    python中的getattr函数
    python实现虚拟茶话会
    利用python爬取海量疾病名称百度搜索词条目数的爬虫实现
    SQL里面如何取得前N条数据?
    python中怎么查看当前工作目录和更改工作目录
    python中thread模块中join函数
    python中的自测语句是什么?
    python读取数据库数据有乱码怎么解决?
  • 原文地址:https://www.cnblogs.com/sunson/p/2987013.html
Copyright © 2011-2022 走看看