zoukankan      html  css  js  c++  java
  • 消息队列之--ActiveMQ

     Activemq介绍

    对于消息的传递有两种类型:

    一种是点对点的,即一个生产者和一个消费者一一对应;

    另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。

    JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。

      · StreamMessage -- Java原始值的数据流

      · MapMessage--一套名称-值对

      · TextMessage--一个字符串对象

      · ObjectMessage--一个序列化的 Java对象

      · BytesMessage--一个字节的数据流

    Centos7环境下安装ActiveMQ

    第一步: ActiveMQ 的压缩包上传到Linux系统。

    第二步:解压缩。

    第三步:启动。

    使用bin目录下的activemq命令启动:

    [root@localhost bin]# ./activemq start

    关闭:

    [root@localhost bin]# ./activemq stop

    查看状态:

    [root@localhost bin]# ./activemq status

    进入管理后台

    http://192.168.176.130:8161/admin/

    用户名:admin

    密码:admin

    JMQ的两种消息模式

    消息列队有两种消息模式,一种是点对点的消息模式,还有一种就是订阅的模式.

    点对点的消息模式

        点对点的模式主要建立在一个队列上面,当连接一个列队的时候,发送端不需要知道接收端是否正在接收,可以直接向ActiveMQ发送消息,发送的消息,将会先进入队列中,如果有接收端在监听,则会发向接收端,如果没有接收端接收,则会保存在activemq服务器,直到接收端接收消息,点对点的消息模式可以有多个发送端,多个接收端,但是一条消息,只会被一个接收端给接收到,哪个接收端先连上ActiveMQ,则会先接收到,而后来的接收端则接收不到那条消息. 

    订阅模式

        订阅/发布模式,同样可以有着多个发送端与多个接收端,但是接收端与发送端存在时间上的依赖,就是如果发送端发送消息的时候,接收端并没有监听消息,那么ActiveMQ将不会保存消息,将会认为消息已经发送,换一种说法,就是发送端发送消息的时候,接收端不在线,是接收不到消息的,哪怕以后监听消息,同样也是接收不到的。这个模式还有一个特点,那就是,发送端发送的消息,将会被所有的接收端给接收到,不类似点对点,一条消息只会被一个接收端给接收到。

    jar包添加到工程中

    使用5.11.2版本的jar

    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
    </dependency>

    点对点的实现代码

    @Test
        public void testQueueProducer() throws Exception {
            //1、创建一个连接工厂对象,需要指定服务的ip及端口。
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.176.130:61616");
            //2、使用工厂对象创建一个Connection对象。
            Connection connection = connectionFactory.createConnection();
            //3、开启连接,调用Connection对象的start方法。
            connection.start();
            //4、创建一个Session对象。
            //第一个参数:是否开启事务。如果true开启事务,第二个参数无意义。一般不开启事务false。
            //第二个参数:应答模式。自动应答或者手动应答。一般自动应答。
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5、使用Session对象创建一个Destination对象。两种形式queue、topic,现在应该使用queue
            Queue queue = session.createQueue("test-queue");
            //6、使用Session对象创建一个Producer对象。
            MessageProducer producer = session.createProducer(queue);
            //7、创建一个Message对象,可以使用TextMessage。
            /*TextMessage textMessage = new ActiveMQTextMessage();
            textMessage.setText("hello Activemq");*/
            TextMessage textMessage = session.createTextMessage("hello activemq");
            //8、发送消息
            producer.send(textMessage);
            //9、关闭资源
            producer.close();
            session.close();
            connection.close();
        }
        
        @Test
        public void testQueueConsumer() throws Exception {
            //创建一个ConnectionFactory对象连接MQ服务器
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.176.130:61616");
            //创建一个连接对象
            Connection connection = connectionFactory.createConnection();
            //开启连接
            connection.start();
            //使用Connection对象创建一个Session对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建一个Destination对象。queue对象
            Queue queue = session.createQueue("test-queue");
            //使用Session对象创建一个消费者对象。
            MessageConsumer consumer = session.createConsumer(queue);
            //接收消息
            consumer.setMessageListener(new MessageListener() {
                
                @Override
                public void onMessage(Message message) {
                    //打印结果
                    TextMessage textMessage = (TextMessage) message;
                    String text;
                    try {
                        text = textMessage.getText();
                        System.out.println(text);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                    
                }
            });
            //等待接收消息
            System.in.read();
            //关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    View Code

    订阅/发布模式的实现代码

        @Test
        public void testTopicProducer() throws Exception {
            //1、创建一个连接工厂对象,需要指定服务的ip及端口。
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.176.130:61616");
            //2、使用工厂对象创建一个Connection对象。
            Connection connection = connectionFactory.createConnection();
            //3、开启连接,调用Connection对象的start方法。
            connection.start();
            //4、创建一个Session对象。
            //第一个参数:是否开启事务。如果true开启事务,第二个参数无意义。一般不开启事务false。
            //第二个参数:应答模式。自动应答或者手动应答。一般自动应答。
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5、使用Session对象创建一个Destination对象。两种形式queue、topic,现在应该使用topic
            Topic topic = session.createTopic("test-topic");
            //6、使用Session对象创建一个Producer对象。
            MessageProducer producer = session.createProducer(topic);
            //7、创建一个Message对象,可以使用TextMessage。
            /*TextMessage textMessage = new ActiveMQTextMessage();
            textMessage.setText("hello Activemq");*/
            TextMessage textMessage = session.createTextMessage("topic message");
            //8、发送消息
            producer.send(textMessage);
            //9、关闭资源
            producer.close();
            session.close();
            connection.close();
        }
        
        @Test
        public void testTopicConsumer() throws Exception {
            //创建一个ConnectionFactory对象连接MQ服务器
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.176.130:61616");
            //创建一个连接对象
            Connection connection = connectionFactory.createConnection();
            //开启连接
            connection.start();
            //使用Connection对象创建一个Session对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建一个Destination对象。topic对象
            Topic topic = session.createTopic("test-topic");
            //使用Session对象创建一个消费者对象。
            MessageConsumer consumer = session.createConsumer(topic);
            //接收消息
            consumer.setMessageListener(new MessageListener() {
                
                @Override
                public void onMessage(Message message) {
                    //打印结果
                    TextMessage textMessage = (TextMessage) message;
                    String text;
                    try {
                        text = textMessage.getText();
                        System.out.println(text);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                    
                }
            });
            System.out.println("topic消费者3启动。。。。");
            //等待接收消息
            System.in.read();
            //关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    View Code
  • 相关阅读:
    严蔚敏数据结构线性表操作
    strcpy/strncpy、strcat/strncpy的替代方案strlcpy,strlcat
    使用DNS的反向区域,解决通过IP反向查询主机名,适合web环境通过IP定位故障主机名
    FreeBSD漏洞程序升级
    定义制造业操作(定义 MES/MOM 系统)
    定义制造业操作(定义 MES/MOM 系统)
    EF / EF CORE 打印SQL
    远程连接桌面Azure AD(Azure AD Joined Computer)
    《精通CSS第3版》(6)Flexbox
    FastNat内网穿透,给开发人员送的硬货福利限时送
  • 原文地址:https://www.cnblogs.com/cnki/p/7856363.html
Copyright © 2011-2022 走看看