zoukankan      html  css  js  c++  java
  • 消息中间件JMS(二)

    之前介绍了ActiveMQ下载与安装,并且启动了。下面进行ActiveMQ的Demo

    1. JMS入门Demo

    1.1 点对点模式

    点对点模式主要建立在一个队列上面,当连接一个队列的时候,发送端不需要知道接收端是否正在接收,可以直接向ActiveMQ发送消息,发送的消息,将会先进入队列中,如果有接收端在监听,则会发送接收端,如果没有接收端接收,则会保存在activeMQ服务器,直到接收端接收消息。点对点的消息模式可以有多个发送端,多个接收端,但是一条消息,只会被一个接收端给接收到,哪个接收端先连上ActiveMQ,则会先接收到,而后来的接收端就接收不到那条消息。

    先引入ActiveMQ的依赖

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-client</artifactId>
        <version>5.13.4</version>
     </dependency>

    1.1.1 消息生产者

    public class QueueProducer {
    
        public static void main(String[] args) throws Exception {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616");
            //2.创建连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取session(会话对象) 参数transacted:是否启动事务;参数acknowledgeMode:消息的确认模式
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建队列对象
            Queue queue = session.createQueue("test-queue");
            //6.创建消息生产者对象
            MessageProducer producer = session.createProducer(queue);
            //7.创建消息对象(文本消息)
            TextMessage textMessage = session.createTextMessage("欢迎来到ActiveMQ");
            //8.发送消息
            producer.send(textMessage);
            //9.关闭资源
            producer.close();
            session.close();
            connection.close();
        }
    
    }

    上述代码中第4步创建session  的两个参数:

    第1个参数 是否使用事务

    第2个参数 消息的确认模式

    • AUTO_ACKNOWLEDGE = 1    自动确认
    • CLIENT_ACKNOWLEDGE = 2    客户端手动确认   
    • DUPS_OK_ACKNOWLEDGE = 3    自动批量确认
    • SESSION_TRANSACTED = 0    事务提交并确认

    运行后通过ActiveMQ管理界面查询

    1.1.2 消息消费者

    public class QueueConsumer {
    
        public static void main(String[] args) throws Exception {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616");
            //2.创建连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.创建会话对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建队列对象
            Queue queue = session.createQueue("test-queue");
            //6.创建消息消费者对象
            MessageConsumer consumer = session.createConsumer(queue);
            //7.设置监听
            consumer.setMessageListener(new MessageListener() {
                
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage)message;
                    try {
                        System.out.println("提取的消息:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            //8.等待键盘输入
            System.in.read();
            //9.关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    
    }

    执行后看到控制台输出

    1.1.3运行测试

    同时开启2个以上的消费者,再次运行生产者,观察每个消费者控制台的输出,会发现只有一个消费者会接收到消息。

    1.2 发布/订阅模式

    1.2.1 消息生产者

    /**
     * 发布订阅模式,消息生产者
     * @author Administrator
     *
     */
    public class TopicProducer {
        public static void main(String[] args) throws Exception {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616");
            //2.创建连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.创建会话对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建主题对象
            Topic topic = session.createTopic("test-topic");
            //6.创建生产者对象
            MessageProducer producer = session.createProducer(topic);
            //7.创建消息对象(文本消息)
            TextMessage textMessage = session.createTextMessage("欢迎来到ActiveMQ");
            //8.发送消息
            producer.send(textMessage);
            //9.关闭资源
            producer.close();
            session.close();
            connection.close();
        }
    
    }

    运行效果如下:

    1.2.2 消息消费者

    public class TopicConsumer {
    
        public static void main(String[] args) throws Exception {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616");
            //2.创建连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.创建会话对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建主题对象
            Topic topic = session.createTopic("test-topic");
            //6.创建消费者对象
            MessageConsumer consumer = session.createConsumer(topic);
            consumer.setMessageListener(new MessageListener() {
                
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("接收消息为:" + textMessage.getText());
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    
                }
            });
            
            //8.等待键盘输入
            System.in.read();
            //9.关闭资源
            consumer.close();
            session.close();
            connection.close();
    
        }
    
    }

    1.2.3运行测试

    同时开启2个以上的消费者,再次运行生产者,观察每个消费者控制台的输出,会发现每个消费者会接收到消息。

  • 相关阅读:
    Lucene.Net 2.3.1开发介绍 —— 二、分词(一)
    控制‘控制台应用程序’的关闭操作
    详解for循环(各种用法)
    敏捷软件开发
    Sql Server的一些知识点
    在SharePoint 2010 中配置Remote Blob Storage FILESTREAM Provider
    使用LotusScript操作Lotus Notes RTF域
    JOpt Simple 4.5 发布,命令行解析器
    John the Ripper 1.8.0 发布,密码破解工具
    PacketFence ZEN 4.0.1 发布,网络接入控制
  • 原文地址:https://www.cnblogs.com/FanJava/p/10484771.html
Copyright © 2011-2022 走看看