zoukankan      html  css  js  c++  java
  • 消息中间件JMS(二)

    之前介绍了ActiveMQ下载与安装,并且启动了。下面进行ActiveMQ的Demo

    1. JMS入门Demo

    1.1 点对点模式

    点对点模式主要建立在一个队列上面,当连接一个队列的时候,发送端不需要知道接收端是否正在接收,可以直接向ActiveMQ发送消息,发送的消息,将会先进入队列中,如果有接收端在监听,则会发送接收端,如果没有接收端接收,则会保存在activeMQ服务器,直到接收端接收消息。点对点的消息模式可以有多个发送端,多个接收端,但是一条消息,只会被一个接收端给接收到,哪个接收端先连上ActiveMQ,则会先接收到,而后来的接收端就接收不到那条消息。

    先引入ActiveMQ的依赖

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-client</artifactId>
        <version>5.13.4</version>
     </dependency>

    1.1.1 消息生产者

    public class QueueProducer {
    
        public static void main(String[] args) throws Exception {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616");
            //2.创建连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取session(会话对象) 参数transacted:是否启动事务;参数acknowledgeMode:消息的确认模式
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建队列对象
            Queue queue = session.createQueue("test-queue");
            //6.创建消息生产者对象
            MessageProducer producer = session.createProducer(queue);
            //7.创建消息对象(文本消息)
            TextMessage textMessage = session.createTextMessage("欢迎来到ActiveMQ");
            //8.发送消息
            producer.send(textMessage);
            //9.关闭资源
            producer.close();
            session.close();
            connection.close();
        }
    
    }

    上述代码中第4步创建session  的两个参数:

    第1个参数 是否使用事务

    第2个参数 消息的确认模式

    • AUTO_ACKNOWLEDGE = 1    自动确认
    • CLIENT_ACKNOWLEDGE = 2    客户端手动确认   
    • DUPS_OK_ACKNOWLEDGE = 3    自动批量确认
    • SESSION_TRANSACTED = 0    事务提交并确认

    运行后通过ActiveMQ管理界面查询

    1.1.2 消息消费者

    public class QueueConsumer {
    
        public static void main(String[] args) throws Exception {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616");
            //2.创建连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.创建会话对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建队列对象
            Queue queue = session.createQueue("test-queue");
            //6.创建消息消费者对象
            MessageConsumer consumer = session.createConsumer(queue);
            //7.设置监听
            consumer.setMessageListener(new MessageListener() {
                
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage)message;
                    try {
                        System.out.println("提取的消息:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            //8.等待键盘输入
            System.in.read();
            //9.关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    
    }

    执行后看到控制台输出

    1.1.3运行测试

    同时开启2个以上的消费者,再次运行生产者,观察每个消费者控制台的输出,会发现只有一个消费者会接收到消息。

    1.2 发布/订阅模式

    1.2.1 消息生产者

    /**
     * 发布订阅模式,消息生产者
     * @author Administrator
     *
     */
    public class TopicProducer {
        public static void main(String[] args) throws Exception {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616");
            //2.创建连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.创建会话对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建主题对象
            Topic topic = session.createTopic("test-topic");
            //6.创建生产者对象
            MessageProducer producer = session.createProducer(topic);
            //7.创建消息对象(文本消息)
            TextMessage textMessage = session.createTextMessage("欢迎来到ActiveMQ");
            //8.发送消息
            producer.send(textMessage);
            //9.关闭资源
            producer.close();
            session.close();
            connection.close();
        }
    
    }

    运行效果如下:

    1.2.2 消息消费者

    public class TopicConsumer {
    
        public static void main(String[] args) throws Exception {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616");
            //2.创建连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.创建会话对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建主题对象
            Topic topic = session.createTopic("test-topic");
            //6.创建消费者对象
            MessageConsumer consumer = session.createConsumer(topic);
            consumer.setMessageListener(new MessageListener() {
                
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("接收消息为:" + textMessage.getText());
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    
                }
            });
            
            //8.等待键盘输入
            System.in.read();
            //9.关闭资源
            consumer.close();
            session.close();
            connection.close();
    
        }
    
    }

    1.2.3运行测试

    同时开启2个以上的消费者,再次运行生产者,观察每个消费者控制台的输出,会发现每个消费者会接收到消息。

  • 相关阅读:
    第十八周作业
    第十七周作业
    第十六周作业
    第十五周作业
    第十四周作业
    第十三周作业
    第十二周作业
    第二阶段考试
    第十周作业
    启航,带着梦想出发!
  • 原文地址:https://www.cnblogs.com/FanJava/p/10484771.html
Copyright © 2011-2022 走看看