zoukankan      html  css  js  c++  java
  • ActiveMQ(03):简单操作

    
    
    一:maven
    <dependencies>
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-core</artifactId>
                <version>5.7.0</version>
            </dependency>
    </dependencies>

    1.点对点

    1.1点对点发送端

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class PTPSend {
        //连接账号
        private String userName = "";
        //连接密码
        private String password = "";
        //连接地址
        private String brokerURL = "tcp://192.168.0.130:61616";
        //connection的工厂
        private ConnectionFactory factory;
        //连接对象
        private Connection connection;
        //一个操作会话
        private Session session;
        //目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic
        private Destination destination;
        //生产者,就是产生数据的对象
        private MessageProducer producer;
        
        public static void main(String[] args) {
            PTPSend send = new PTPSend();
            send.start();
        }
        
        public void start(){
            try {
                //根据用户名,密码,url创建一个连接工厂
                factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
                //从工厂中获取一个连接
                connection = factory.createConnection();
                //测试过这个步骤不写也是可以的,但是网上的各个文档都写了
                connection.start();
                //创建一个session
                //第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
                //第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
                //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
                //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
                //DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                //创建一个到达的目的地,其实想一下就知道了,activemq不可能同时只能跑一个队列吧,这里就是连接了一个名为"text-msg"的队列,这个会话将会到这个队列,当然,如果这个队列不存在,将会被创建
                destination = session.createQueue("text-msg");
                //从session中,获取一个消息生产者
                producer = session.createProducer(destination);
                //设置生产者的模式,有两种可选
                //DeliveryMode.PERSISTENT 当activemq关闭的时候,队列数据将会被保存
                //DeliveryMode.NON_PERSISTENT 当activemq关闭的时候,队列里面的数据将会被清空
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                
                //创建一条消息,当然,消息的类型有很多,如文字,字节,对象等,可以通过session.create..方法来创建出来
                TextMessage textMsg = session.createTextMessage("呵呵");
                for(int i = 0 ; i < 100 ; i ++){
                    //发送一条消息
                    producer.send(textMsg);
                }
                
                System.out.println("发送消息成功");
                //即便生产者的对象关闭了,程序还在运行哦
                producer.close();
                
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    1.2点对点接收端

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class PTPReceive {
        //连接账号
        private String userName = "";
        //连接密码
        private String password = "";
        //连接地址
        private String brokerURL = "tcp://192.168.0.130:61616";
        //connection的工厂
        private ConnectionFactory factory;
        //连接对象
        private Connection connection;
        //一个操作会话
        private Session session;
        //目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic
        private Destination destination;
        //消费者,就是接收数据的对象
        private MessageConsumer consumer;
        public static void main(String[] args) {
            PTPReceive receive = new PTPReceive();
            receive.start();
        }
        
        public void start(){
            try {
                //根据用户名,密码,url创建一个连接工厂
                factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
                //从工厂中获取一个连接
                connection = factory.createConnection();
                //测试过这个步骤不写也是可以的,但是网上的各个文档都写了
                connection.start();
                //创建一个session
                //第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
                //第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
                //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
                //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
                //DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                //创建一个到达的目的地,其实想一下就知道了,activemq不可能同时只能跑一个队列吧,这里就是连接了一个名为"text-msg"的队列,这个会话将会到这个队列,当然,如果这个队列不存在,将会被创建
                destination = session.createQueue("text-msg");
                //根据session,创建一个接收者对象
                consumer = session.createConsumer(destination);
                
                
                //实现一个消息的监听器
                //实现这个监听器后,以后只要有消息,就会通过这个监听器接收到
                consumer.setMessageListener(new MessageListener() {
                    @Override
                    public void onMessage(Message message) {
                        try {
                            //获取到接收的数据
                            String text = ((TextMessage)message).getText();
                            System.out.println(text);
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                });
                //关闭接收端,也不会终止程序哦
    //            consumer.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    2.订阅发布

    2.1订阅发布发送端
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class TOPSend {
        //连接账号
            private String userName = "";
            //连接密码
            private String password = "";
            //连接地址
            private String brokerURL = "tcp://192.168.0.130:61616";
            //connection的工厂
            private ConnectionFactory factory;
            //连接对象
            private Connection connection;
            //一个操作会话
            private Session session;
            //目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic
            private Destination destination;
            //生产者,就是产生数据的对象
            private MessageProducer producer;
            
            public static void main(String[] args) {
                TOPSend send = new TOPSend();
                send.start();
            }
            
            public void start(){
                try {
                    //根据用户名,密码,url创建一个连接工厂
                    factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
                    //从工厂中获取一个连接
                    connection = factory.createConnection();
                    //测试过这个步骤不写也是可以的,但是网上的各个文档都写了
                    connection.start();
                    //创建一个session
                    //第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
                    //第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
                    //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
                    //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
                    //DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
                    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                    //创建一个到达的目的地,其实想一下就知道了,activemq不可能同时只能跑一个队列吧,这里就是连接了一个名为"text-msg"的队列,这个会话将会到这个队列,当然,如果这个队列不存在,将会被创建
                    
                    
                    
                    //=======================================================
                    //点对点与订阅模式唯一不同的地方,就是这一行代码,点对点创建的是Queue,而订阅模式创建的是Topic
                    destination = session.createTopic("topic-text");
                    //=======================================================
                    
                    
                    
                    
                    //从session中,获取一个消息生产者
                    producer = session.createProducer(destination);
                    //设置生产者的模式,有两种可选
                    //DeliveryMode.PERSISTENT 当activemq关闭的时候,队列数据将会被保存
                    //DeliveryMode.NON_PERSISTENT 当activemq关闭的时候,队列里面的数据将会被清空
                    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                    
                    //创建一条消息,当然,消息的类型有很多,如文字,字节,对象等,可以通过session.create..方法来创建出来
                    TextMessage textMsg = session.createTextMessage("哈哈");
                    long s = System.currentTimeMillis();
                    for(int i = 0 ; i < 100 ; i ++){
                        //发送一条消息
                        producer.send(textMsg);
                    }
                    long e = System.currentTimeMillis();
                    System.out.println("发送消息成功");
                    System.out.println(e - s);
                    //即便生产者的对象关闭了,程序还在运行哦
                    producer.close();
                    
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
    }
    
    

    2.2订阅发布接收端
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class TOPSend {
        //连接账号
            private String userName = "";
            //连接密码
            private String password = "";
            //连接地址
            private String brokerURL = "tcp://192.168.0.130:61616";
            //connection的工厂
            private ConnectionFactory factory;
            //连接对象
            private Connection connection;
            //一个操作会话
            private Session session;
            //目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic
            private Destination destination;
            //生产者,就是产生数据的对象
            private MessageProducer producer;
            
            public static void main(String[] args) {
                TOPSend send = new TOPSend();
                send.start();
            }
            
            public void start(){
                try {
                    //根据用户名,密码,url创建一个连接工厂
                    factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
                    //从工厂中获取一个连接
                    connection = factory.createConnection();
                    //测试过这个步骤不写也是可以的,但是网上的各个文档都写了
                    connection.start();
                    //创建一个session
                    //第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
                    //第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
                    //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
                    //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
                    //DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
                    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                    //创建一个到达的目的地,其实想一下就知道了,activemq不可能同时只能跑一个队列吧,这里就是连接了一个名为"text-msg"的队列,这个会话将会到这个队列,当然,如果这个队列不存在,将会被创建
                    
                    
                    
                    //=======================================================
                    //点对点与订阅模式唯一不同的地方,就是这一行代码,点对点创建的是Queue,而订阅模式创建的是Topic
                    destination = session.createTopic("topic-text");
                    //=======================================================
                    
                    
                    
                    
                    //从session中,获取一个消息生产者
                    producer = session.createProducer(destination);
                    //设置生产者的模式,有两种可选
                    //DeliveryMode.PERSISTENT 当activemq关闭的时候,队列数据将会被保存
                    //DeliveryMode.NON_PERSISTENT 当activemq关闭的时候,队列里面的数据将会被清空
                    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                    
                    //创建一条消息,当然,消息的类型有很多,如文字,字节,对象等,可以通过session.create..方法来创建出来
                    TextMessage textMsg = session.createTextMessage("哈哈");
                    long s = System.currentTimeMillis();
                    for(int i = 0 ; i < 100 ; i ++){
                        //发送一条消息
                        textMsg.setText("哈哈" + i);
                        producer.send(textMsg);
                    }
                    long e = System.currentTimeMillis();
                    System.out.println("发送消息成功");
                    System.out.println(e - s);
                    //即便生产者的对象关闭了,程序还在运行哦
                    producer.close();
                    
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
    }
    
    

    三.发送消息的数据类型

     如上图支持很多种数据格式,字符串,序列化对象,流,字节等等
    
    

    四.ActiveMQ应用

    4.1 保证消息的成功处理

    消息发送成功后,接收端接收到了消息。然后进行处理,但是可能由于某种原因,高并发也好,IO阻塞也好,反正这条消息在接收端处理失败了。而点对点的特性是一条消息,只会被一个接收
    端给接收,只要接收端A接收成功了,接收端B,就不可能接收到这条消息,如果是一些普通的消息还好,但是如果是一些很重要的消息,比如说用户的支付订单,用户的退款,这些与金钱相关的
    ,是必须保证成功的,那么这个时候要怎么处理呢?

    我们可以使用  CLIENT_ACKNOWLEDGE  模式

    之前其实就有提到当创建一个session的时候,需要指定其事务,及消息的处理模式,当时使用的是
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    

      

    AUTO_ACKNOWLEDGE 

    这一个代码的是,当消息发送给接收端之后,就自动确认成功了,而不管接收端有没有处理成功,而一旦确认成功后,就会把队列里面的消息给清除掉,避免下一个接收端接收到同样的消息。

    那么,它还有另外一个模式,那就是 CLIENT_ACKNOWLEDGE

    这行要写在接收端里面,不是写在发送端的

    
    
    session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    
    

    这行代码以后,如果接收端不确认消息,那么activemq将会把这条消息一直保留,直到有一个接收端确定了消息。

    那么要怎么确认消息呢?

    在接收端接收到消息的时候,调用javax.jms.Message的acknowledge方法

    @Override
                    public void onMessage(Message message) {
                        try {
                            //获取到接收的数据
                            String text = ((TextMessage)message).getText();
                            System.out.println(text);
                            //确认接收,并成功处理了消息
                            message.acknowledge();
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }

    这样,当消息处理成功之后,确认消息,如果不确定,activemq将会发给下一个接收端处理

     注意:只在点对点中有效,订阅模式,即使不确认,也不会保存消息

    4.2 避免消息队列的并发

    JMQ设计出来的原因,就是用来避免并发的,和沟通两个系统之间的交互。

     4.2.1主动接受队列消息

          if(当程序有能力处理){//当程序有能力处理时接收
                        Message receive = consumer.receive();
               //这个可以设置超时时间,超过则不等待消息 
                recieve.receive(10000);
                        //其实receive是一个阻塞式方法,一定会拿到值的
                        if(null != receive){
                            String text = ((TextMessage)receive).getText();
                            receive.acknowledge();
                            System.out.println(text);
                        }else{
                            //没有值嘛
                            //
                        }
                    }
                

    通过上面的代码,就可以让程序自已判断,自己是否有能力接收这条消息,如果不能接收,那就给别的接收端接收,或者等自己有能力处理的时候接收

    4.2.2使用多个接收端

    ActiveMQ是支持多个接收端的,如果当程序无法处理这么多数据的时候,可以考虑多个线程,或者增加服务器来处理。

    4.3消息有效期的管理

    这样的场景也是有的,一条消息的有效时间,当发送一条消息的时候,可能希望这条消息在指定的时间被处理,如果超过了指定的时间,那么这

    条消息就失效了,就不需要进行处理了,那么我们可以使用ActiveMQ的设置有效期来实现

    TextMessage msg = session.createTextMessage("哈哈");
                for(int i = 0 ; i < 100 ; i ++){
                    //设置该消息的超时时间
                    producer.setTimeToLive(i * 1000);
                    producer.send(msg);
                }

    这里每一条消息的有效期都是不同的,打开ip:8161/admin/就可以查看到,里面的消息越来越少了。

    过期的消息是不会被接收到的。

    过期的消息会从队列中清除,并存储到ActiveMQ.DLQ这个队列里面,这个稍后会解释。

    4.4过期消息,处理失败消息如何处理

    过期的、处理失败的消息,将会被ActiveMQ置入“ActiveMQ.DLQ”这个队列中。

    这个队列是ActiveMQ自动创建的。

    如果需要查看这些未被处理的消息,可以进入这个队列中查看

    //指定一个目的地,也就是一个队列的位置
    destination = session.createQueue("ActiveMQ.DLQ");

    这样就可以进入队列中,然后实现接口,或者通过receive()方法,就可以拿到未被处理的消息,从而保证正确的处理

    五.ActiveMQ安全配置

    5.1管理后台密码配置

    在activemq/conf/jetty.xml中找到

    <pre name="code" class="html"> <bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint">
            <property name="name" value="BASIC" />
            <property name="roles" value="admin" />
             <!-- 把这个改为true,当然,高版本的已经改为了true -->
            <property name="authenticate" value="true" />
      </bean>

    高版本的已经默认成为了true。所以我们直接进行下一步即可

    在activemq/conf/jetty-realm.properties文件中配置,打开如下

    ## ---------------------------------------------------------------------------
    ## Licensed to the Apache Software Foundation (ASF) under one or more
    ## contributor license agreements.  See the NOTICE file distributed with
    ## this work for additional information regarding copyright ownership.
    ## The ASF licenses this file to You under the Apache License, Version 2.0
    ## (the "License"); you may not use this file except in compliance with
    ## the License.  You may obtain a copy of the License at
    ##
    ## http://www.apache.org/licenses/LICENSE-2.0
    ##
    ## Unless required by applicable law or agreed to in writing, software
    ## distributed under the License is distributed on an "AS IS" BASIS,
    ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    ## See the License for the specific language governing permissions and
    ## limitations under the License.
    ## ---------------------------------------------------------------------------
    
    # Defines users that can access the web (console, demo, etc.)
    # username: password [,rolename ...]
    #用户名,密码,角色
    admin: admin, admin
    user: user, user

    注意:大家重点看倒数第二行,那里三个分别是用户名,密码,角色,其中admin角色是固定的

    5.2生产消息者连接密码

    注意:activemq默认是不需要密码,生产消费者就可以连接的

    我们需要经过配置,才能设置密码,这一步在生产环境中一定要配置

    找到activemq/conf/activemq.xml,并打开
    在<broker>节点中,在<systemUsage>节点上面,增加如下的一个插件

    复制代码
    <plugins>
                 <simpleAuthenticationPlugin>
                     <users>
                         <authenticationUser username="${activemq.username}" password="${activemq.password}" groups="users,admins"/>
                     </users>
                 </simpleAuthenticationPlugin>
             </plugins>
    复制代码

    这样就开启了密码认证
    然后账号密码的配置在activemq/conf/credentials.properties文件中

    打开这个文件如下

    复制代码
    ## ---------------------------------------------------------------------------
    ## Licensed to the Apache Software Foundation (ASF) under one or more
    ## contributor license agreements.  See the NOTICE file distributed with
    ## this work for additional information regarding copyright ownership.
    ## The ASF licenses this file to You under the Apache License, Version 2.0
    ## (the "License"); you may not use this file except in compliance with
    ## the License.  You may obtain a copy of the License at
    ##
    ## http://www.apache.org/licenses/LICENSE-2.0
    ##
    ## Unless required by applicable law or agreed to in writing, software
    ## distributed under the License is distributed on an "AS IS" BASIS,
    ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    ## See the License for the specific language governing permissions and
    ## limitations under the License.
    ## ---------------------------------------------------------------------------
    
    # Defines credentials that will be used by components (like web console) to access the broker
    
    #账号
    activemq.username=admin
    #密码
    activemq.password=123456
    guest.password=password
    复制代码

    这样就配置完毕了。

     

     

  • 相关阅读:
    菜根谭#188
    菜根谭#187
    Single value range only allowed in SystemVerilog
    LUTs, Flip-Flop, Slice
    FPGA 的 RAM 的 区别
    GPU core clock, shader clock ???
    更改Mac的Terminal 格式
    GPU share memory 的应用 (主要内容转载)
    Mac text edit & pdf reader
    Programming Font
  • 原文地址:https://www.cnblogs.com/quangeshishen/p/10190659.html
Copyright © 2011-2022 走看看