zoukankan      html  css  js  c++  java
  • 02_ActiveMQ入门

    【ActiveMQ 入门HelloWorld例子】

    【启动ActiveMQ】

    1.由于本人PC是64位的,选择在bin目录下的win64/activemq.bat启动。

    2.启动成功后,访问http://localhost:8161/admin/ ,输入用户名密码,默认均为admin。进入ActiveMq的web页面的控制台。

     

    【工程截图】

    【生产者Sender.java】

    package test.activemq.demo;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Sender {
        public static void main(String[] args) throws JMSException {
            /**
             * 1.建立ConnectionFactory工厂对象,需要填入用户名、密码、以及要连接的地址,均使用默认,端口默认为"tcp://localhost:61616"
             */
            ConnectionFactory connectionFactory  = new ActiveMQConnectionFactory(
                    ActiveMQConnectionFactory.DEFAULT_USER,
                    ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                    "tcp://localhost:61616"); 
            
            /**
             * 2.通过ConnectionFactory工厂对象创建一个Connection连接,并且调用Connection的start()方法开启连接,Connection默认是关闭的
             */
            Connection connection = connectionFactory.createConnection();
            connection.start();
            
            /**
             * 3.通过Connection对象创建Session会话(上下文环境对象),用于接收消息。
             *   参数1:是否启用事务 
             *   参数2:签收模式,一般设置为自动签收
             */
            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            
            /**
             * 4.通过Session创建Destination对象,即一个客户端用来指定 生产者目标 和 消费者来源的 对象。
             *   PTP模式中:Destination被称为Queue,队列
             *   Pub/Sub模式中:Destination被称为Topic,主题
             *   在程序中可以使用多个Queue 和 Topic
             */
            Destination destination = session.createQueue("queue1");
            
            /**
             * 5.需要通过Session对象创建消息的发送或接受对象(生产者或消费者,MessageProducer/MessageConsumer)
             */
            MessageProducer messageProducer = session.createProducer(destination);
            
            /**
             * 6.使用MessageProducer的serDeliveryMode方法为其设置持久化特性和非持久化特性(DeliveryMode)
             */
            messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  //设置为不持久化
            
            /**
             * 7.最后,使用JMS规范的TextMessage形式创建数据(通过Session对象),
             * 发送端使用MessageProducer的Send方法发送数据。
             * 接受端使用receive方法接收数据。
             */
            for(int i=0;i<5;i++){
                TextMessage testMessage = session.createTextMessage();
                testMessage.setText("我的Id的是:"+i);
                messageProducer.send(testMessage);
                System.out.println("【生产者发送】"+testMessage.getText());
            }
             
            /**
             * 8.最后记得关闭Connection连接
             */
             if(connection!=null){
                 connection.close();
             }
        }
    }

    【Sender运行结果:Eclipse控制台】

    【Sender运行结果:web控制台】

    Number Of Pending Messages :等待消费的队列

    Messages Enqueued  : 进入队列的消息的总数量(总数量只增不减)

    Messages Dequeued :出了队列的消息的总数量(即消费了的消息数量)

    【Sender运行结果:点击"queue1"查看队列详情】

    【消费者Receiver.java】

    package test.activemq.demo;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Receiver {
        public static void main(String[] args) throws JMSException {
            /**
             * 1.建立ConnectionFactory工厂对象,需要填入用户名、密码、以及要连接的地址,均使用默认,端口默认为"tcp://localhost:61616"
             */
            ConnectionFactory connectionFactory  = new ActiveMQConnectionFactory(
                    ActiveMQConnectionFactory.DEFAULT_USER,
                    ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                    "tcp://localhost:61616"); 
            
            /**
             * 2.通过ConnectionFactory工厂对象创建一个Connection连接,并且调用Connection的start()方法开启连接,Connection默认是关闭的
             */
            Connection connection = connectionFactory.createConnection();
            connection.start();
            
            /**
             * 3.通过Connection对象创建Session会话(上下文环境对象),用于接收消息。参数1:是否启用事务 参数2:签收模式,一般设置为自动签收
             */
            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            
            /**
             * 4.通过Session创建Destination对象,即一个客户端用来指定生产者目标 和 消费者来源的 对象。
             * PTP模式中:Destination被称为Queue,队列
             * Pub/Sub模式中:Destination被称为Topic,主题
             * 在程序中可以使用多个Queue 和 Topic
             */
            Destination destination = session.createQueue("queue1");   //注意要和消息生产者的队列名一致!!
            
            /**
             * 5.需要通过Session对象创建消息的发送或接受对象(生产者或消费者,MessageProducer/MessageConsumer)
             */
            MessageConsumer messageConsumer = session.createConsumer(destination);
            
            
            /**
             * 7.最后,使用JMS规范的TextMessage形式创建数据(通过Session对象),
             *   发送端使用MessageProducer的Send方法发送数据。
             *   接受端使用receive方法接收数据。
             */
            while(true){
                TextMessage msg = (TextMessage)messageConsumer.receive();
                if(msg==null) 
                    break;
                System.out.println("【消费者接收】"+msg.getText());
            }
             
            /**
             * 8.最后记得关闭Connection连接
             */
             if(connection!=null){
                 connection.close();
             }
        }
    }

    【Receiver运行结果:Eclipse控制台】

    【Receiver运行结果:Web页面控制台】

    【Receiver运行结果:点击"queue1"查看队列详情】

    【重点:Connection方法使用】

    在成功创建ConnectionFactory后,下一步是创建一个连接,它是JMS定义的一个接口。

    ConnectionFactory负责返回Connection实现。

    Connection可以与底层消息传递系统进行通信,通常客户端只使用单一连接。

    根据JMS文档,Connection的目的是“利用JMS提供者封装开放的连接”,以及“客户端与提供者服务例程之间的开放TCP/IP套接字”。

    Connection还应该是进行客户端身份验证的地方。

    当一个Connection被创建时,它的传输默认是关闭的,必须使用start方法开启。

    一个Connection可以建立一个或多个Session。

    当一个程序执行完成之后,必须关闭之前创建的Connection,否则ActiveMQ不能释放资源,关闭一个Connection同样也关闭了Session、MessageProducer、MessageConsumer。

    ConnectingFactory有两个创建的方法:

    1.Connection createConnection();

    2.Connection createConnection(String username,String password,String url);

    【重点:Session方法使用】

    一旦从ConnectionFactory中获得一个Connection,必须从Connection中创建一个或多个Session。

    Session是一个发送或者接收消息的线程,可以使用Session创建MessageProducer、MessageConsumer 和 Message。

    Session可以被事务化,也可以不被事务化。通常可通过向Connection上的创建Session方法传递一个布尔参数对其进行设置。

    connection.createSession( boolean transacted , int acknowledgeMode );

    transacted 使用事务标记

    acknowledgeMode 签收模式

    [ 结束事务有两种方法:提交、回滚。]

    如果一个事务提交,消息被处理。

    如果事务中有一个步骤失败,事务就回滚,这个事务中的已经被执行的动作将被撤销。

    在发送消息的最后也必须要使用session.commit()方法提交事务。

    [ 签收模式有三种 ]

    Session.AUTO_ACKNOWLEDGE:当客户端从receiver或onMessage成功返回时,Session自动签收客户端的这条消息的收条。

    Session.CLIENT_ACKNOWLEDGE:客户端通过调用消息(Message)的acknowledge方法签收消息。在这种情况下,签收发生在Session层面:签收一个已经消费的消息会自动地签收这个Session所有已消费的收条。

    Session.DUPS_OK_ACKNOWLEDGE:Session不必确保对传送消息的签收,这个模式可能会引起消息的重复,但是降低了Session的开销,所以只有客户端能容忍重复的消息,才可使用。

    【重点:MessageProducer】

    MessageProducer是一个由Session创建的对象,用来向Destination发送消息。

    void send( Destination destination , Message message );

    void send( Destination destination , Message message ,  int deliveryMode , int priority, long timeToLive );

    void send(  Message message );

    void send( Message message ,  int deliveryMode , int priority, long timeToLive );

    deliveryMode:发送模式

    timeToLive:消息过期时间

    ActiveMQ支持两种消息传递模式:PERSISTENT(默认)、NO_PERSOSTENT两种。

    如果容忍消息丢失,那么使用非持久化(NO_PESISTENT)消息可以改善性能和减少存储的开销。

    priority:消息优先级

    消息的优先级从0-9共10个级别,默认4。0-4是普通消息,5-9是加急消息。JMS不要求严格按照这10个优先级发送消息,但必须保证加急消息要先于普通消息到达。

    timeToLive :消息存活时间

    默认情况下,消息永不会过期,如果消息在特定周期内失去意义,那么可以设置过期时间,时间为毫秒。

    【重点:MessageConsumer 】 

    MessageConsumer是一个由Session创建的对象,用来从Destination中接收消息。

    MessageConsumer createConsumer( Destionation destination ) ;

    MessageConsumer createConsumer( Destionation destination , String MessageSelector ) ;

    MessageConsumer createConsumer( Destionation destination , String MessageSelector , boolean noLocal) ;

    MessageConsumer createConsumer( Topic topic , String name ) ;

    MessageConsumer createConsumer( Topic topic , String name ,  String MessageSelector , boolean noLocal ) ;

    MessageSelector:消息选择器

    noLocal:默认为false,当设置为true时,限制消费者只能接收和自己相同的连接(Connection)所发布的消息,此标志适用于主题(Topic),不适用于队列(Queue)。

    name :标志订阅所对应的订阅名称,持久订阅时需要设置此参数。

    public final SELECTOR = "JMS_TYPE "="MY_TAG1";该选择器检查了传入消息的JMS_TYPE属性,并确定了这个属性的值是否等于MY_TAG1,如果相等,则消息被消费,否则忽略该消息。( 消息过滤可以使用 )

    【重点:Message】

    JMS程序的最终目的是生产和消费的消息能被其他程序使用,JMS的Message是一个即简单又不缺乏灵活性的基本格式,允许创建不同平台上符合非JMS的程序格式的消息。

    Message由以下部分组成:消息头、属性、和消息体。

    BlobMessage createBlobMessage(File file);

    BlobMessage createBlobMessage(InputStream in) ;

    BlobMessage createBlobMessage(URL url);

    BlobMessage createBlobMessage(URL url , boolean deletedByBroker);

    BlobMessage createBlobMessage( );

    MapMessage  createBlobMessage( );

    Message  createBlobMessage( );

    ObjectMessage  createBlobMessage( );

    ObjectMessage  createBlobMessage( Serializable object );

    TextMessage createTextMessage();

    TextMessage createTextMessage( String text );

    注意:我们一般会在接收端使用instanceOf方法区别数据类型。

  • 相关阅读:
    由于服务主机:DCOM服务进程占用过多CPU,导致系统卡死
    MySQL优化
    input type="file"文件上传到后台读取
    mysql 创建事件
    Quartz.Net实现的定时执行任务调度
    js 编码详解
    C# DateTime.Now 详解
    C# 读写text 详细讲解
    百度地图API详细介绍
    layui table 详细讲解
  • 原文地址:https://www.cnblogs.com/HigginCui/p/6623077.html
Copyright © 2011-2022 走看看