zoukankan      html  css  js  c++  java
  • 分布式-信息方式-JMS Topic示例

                                                      Topic消息

    非持久的 Topic消息示例
    对于非持久的 Topic消息的发送
           基本跟前而发送队列信息是一样的,以是把创建 Destination的地方,由创
    建队列替换成创建 Topic,例如:
    Destination destination =session createTopic( "mytopic");
    对于非持久的 Topic消息的接收
    1:必须要接收方在线,然后客户端再发送信息,接收方才能接收到消息
    2:同样把创建 Destination的地方,由创建队列替换成创建 Topic,例如:
    Destination destination session. createTopic("mytopic");
    3:由于不知道客户端发送多少信息,因此改成 while循环的方式了,例如:
    Message message consumer. receive():
       while (message!=null){
                      TextMessage txtMsg =(TextMessage)message;
                      System.out.printIn("收到消息:”+ txtMsg. getText());
                      message= consumer. receive(1000L);

    }

    生产者

    package test.mq.topic;
    
     
    
     
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MapMessage;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Sender {
           public static void main(String[] args) throws JMSException {
               
               ConnectionFactory   ConnectionFactory=new ActiveMQConnectionFactory(
                    "tcp://localhost:61616"
                    );
            
            Connection connection=ConnectionFactory.createConnection();
            connection.start();
    
            Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            
            Destination destination=session.createTopic("mytopic");
             
            MessageProducer messageProducer=session.createProducer(destination);
         
            for(int i=1;i<=5;i++){
                 TextMessage textMessage=session.createTextMessage();
                 textMessage.setText("我是TOM ID为"+i);
                 messageProducer.send(textMessage);
                 System.out.println("生产者:"+textMessage.getText());
                
            }
             session.commit();
             session.close();
             connection.close();    
        }
    }

     控制台信息

    消费者

    package test.mq.topic;
    
     
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.broker.ConsumerBrokerExchange;
    
    public class Receiver {
        
        public static void main(String[] args) throws Exception {
            ConnectionFactory   ConnectionFactory=new ActiveMQConnectionFactory(
                    "tcp://localhost:61616"
                    );
            
            Connection connection=ConnectionFactory.createConnection();
            connection.start();
            Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            
            Destination destination=session.createTopic("mytopic");
            MessageConsumer messageConsumer=session.createConsumer(destination);
            Message msg=  messageConsumer.receive();
            
             while(msg!=null){
                 TextMessage msgs=(TextMessage) msg;
                 System.out.println("接受信息----》"+msgs.getText());
                 msg= messageConsumer.receive(1000L);
                 }
             session.commit();
             session.close();
             connection.close();
        }
    }

    控制台信息

    对于持久的 Topic消息的发送
     

           ConnectionFactory ConnectionFactory=new ActiveMQConnectionFactory("tcp://localhost:61616");
           Connection connection=ConnectionFactory.createConnection();
           Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
           Destination destination=session.createTopic("mytopic2");
           MessageProducer Producer=session.createProducer(destination);
           Producer.setDeliveryMode(DeliveryMode.PERSISTENT);
          connection.start();
          for(int i=1;i<=5;i++){
                  TextMessage textMessage=session.createTextMessage();
                  textMessage.setText("我是tom ID为"+i);
                  messageProducer.send(textMessage);
                  System.out.println("生产者:"+textMessage.getText());
            }
           session.commit();
           session.close();
          connection.close();


    1:要用持久化订阅,发送消息者要用 DeliveryMode, PERSISTENT模式发现,在连接之前设定
    2:一定要设置完成后,再 start这个 connection

    对于持久的 Topic消息的接收

               ConnectionFactory ConnectionFactory=new ActiveMQConnectionFactory("tcp://localhost:61616");
               Connection connection=ConnectionFactory.createConnection();
               connection.setClientID("cc1");
               Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
               Topic destination=session.createTopic("mytopic2");
               TopicSubscriber ts=session.createDurableSubscriber(destination, "T1");
                connection.start();
                Message msg= ts.receive();

               while(msg!=null){
                             TextMessage msgs=(TextMessage) msg;
                              System.out.println("接受信息----》"+msgs.getText());
                               msg= ts.receive(1000L);
                 }
                session.commit();
                session.close();
                connection.close();

    1:需要在连接上设置消费者id,用来识别消费者
    2:需要创建 TopicSubscriber来订阅
    3:要设置好了过后再 start这个 connection
    4:一定要先运行一次,等于向消息服务中间件注册这个消费者,然后再运行客户端发送信息,这个时候,
    无论消费者是否在线,都会接收到,不在线的话,下次连接的时候,会把没有收过的消息都接收下来。

    生产者

    package test.mq.topic1;
    
     
    
     
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Sender {
           public static void main(String[] args) throws JMSException {
            ConnectionFactory   ConnectionFactory=new ActiveMQConnectionFactory(
                    "tcp://localhost:61616"
                    );
            Connection connection=ConnectionFactory.createConnection();
            Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            Destination destination=session.createTopic("mytopic2");
            MessageProducer messageProducer=session.createProducer(destination);
            messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
            connection.start();
            for(int i=1;i<=5;i++){
                 TextMessage textMessage=session.createTextMessage();
                 textMessage.setText("我是tom ID为"+i);
                 messageProducer.send(textMessage);
                 System.out.println("生产者:"+textMessage.getText());
            }
             session.commit();
             session.close();
             connection.close();    
        }
    }

    消费者

    package test.mq.topic1;
    
     
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    import javax.jms.TopicSubscriber;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.broker.ConsumerBrokerExchange;
    
    public class Receiver {
        
        public static void main(String[] args) throws Exception {
            ConnectionFactory   ConnectionFactory=new ActiveMQConnectionFactory(
                    "tcp://localhost:61616"
                    );
            
            Connection connection=ConnectionFactory.createConnection();
            connection.setClientID("cc1");
            Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            Topic destination=session.createTopic("mytopic2");
            TopicSubscriber ts=session.createDurableSubscriber(destination, "T1");
            connection.start();
            Message msg=  ts.receive();
             while(msg!=null){
                 TextMessage msgs=(TextMessage) msg;
                 System.out.println("接受信息----》"+msgs.getText());
                 msg= ts.receive(1000L);
                 }
             session.commit();
             session.close();
             connection.close();
        }
    }

    控制台

    总结:

    关于持久化和非持久化消息
    持久化消息
    这是 Active的默认传送模式,此模式保证这些消息只被传送一次和成功使用一
    次。对于这些消息,可靠性是优先考虑的因素。可靠性的另一个重要方面是确保持久性消
    息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息
    这意味着在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消
    息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者。虽然
    这样增加了消息传送的开销,但却增加了可靠性。
    非持久化消息
    保证这些消息最多被传送一次。对于这些消息,可靠性并非主要的考虑因素。此模
    式并不要求持久性的数据存储,也不保证消息服务由于某种原因导致失败后消息不会丢
    失。有两种方法指定传送模式:
    1.使用 set DeliveryMode方法,这样所有的消息都采用此传送模式:如:
    roducer set DeliveryMode( DeliveryMode NON PERSISTENT)
    2.使用send方法为每一条消息设置传送模式

  • 相关阅读:
    验证整数或小数
    数据库的连接查询比较
    批处理按钮
    事务 SQL
    关于GridView模板的一些总结
    C#数据结构之线性表
    C#面向对象基础
    C#集合类:动态数组、队列、栈、哈希表、字典
    如何更改master中WebParts中自定义控件的值。
    如何取得web.config中connectings中的值
  • 原文地址:https://www.cnblogs.com/caoyingjielxq/p/9335658.html
Copyright © 2011-2022 走看看