zoukankan      html  css  js  c++  java
  • 消息中间件JMS

    一.什么是消息中间件  

       消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。

      ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。

      

      对于消息的传递有两种类型:

        一种是点对点的,即一个生产者和一个消费者一一对应;

        

        另一种是发布/ 订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。 

        

    二.ActiveMQ下载与安装

      1.官方网站下载:http://activemq.apache.org/

      2.centos上安装ActiveMQ

        1)将apache-activemq-5.12.0-bin.tar.gz 上传至服务器

        2)解压

          tar  zxvf  apache-activemq-5.12.0-bin.tar.gz

        3)为apache-activemq-5.12.0目录赋权

          chmod 777 apache-activemq-5.12.0

        4)进入apache-activemq-5.12.0in目录,赋与执行权限

         chmod 755 activemq 

      3.启动  

        ./activemq start

        启动成功出现如下信息

        

      4.打开浏览器输入服务器地址

        http://192.168.56.102:8161

        即可访问管理界面

        

        点击Manage ActiveMQ broker登录,登录名和密码均为admin

      5.点对点消息列表Queues  

        

        Number Of Pending Messages  :等待消费的消息 这个是当前未出队列的数量。
        Number Of Consumers  :消费者 这个是消费者端的消费者数量
        Messages Enqueued  :进入队列的消息  进入队列的总数量,包括出队列的。
        Messages Dequeued  :出了队列的消息  可以理解为是消费这消费掉的数量。

    三.测试案例

      1.点对点模式

        1.1消息生产者

        1)创建Maven工程,引入依赖

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-client</artifactId>
        <version>5.13.4</version>
    </dependency>

        2)创建类QueueProducer  main方法代码如下:

        //1.创建连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");
        //2.获取连接
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        
        //5.创建队列对象
        Queue queue = session.createQueue("test-queue");
        //6.创建消息生产者
        MessageProducer producer = session.createProducer(queue);
        //7.创建消息
        TextMessage textMessage = session.createTextMessage("欢迎来到ActiveMQ世界");
        //8.发送消息
        producer.send(textMessage);
        //9.关闭资源
        producer.close();
        session.close();
        connection.close();    

        上述代码中第4步创建session  的两个参数: 

        第2个参数 消息的确认模式

    • AUTO_ACKNOWLEDGE = 1    自动确认
    • CLIENT_ACKNOWLEDGE = 2    客户端手动确认   
    • DUPS_OK_ACKNOWLEDGE = 3    自动批量确认
    • SESSION_TRANSACTED = 0    事务提交并确认

        运行后通过ActiveMQ管理界面查询 

      

        1.2 消息消费者

        创建类QueueConsumer ,main方法代码如下:

        //1.创建连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.56.102:61616");
        //2.获取连接
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建队列对象
        Queue queue = session.createQueue("test-queue");
        //6.创建消息消费
        MessageConsumer consumer = session.createConsumer(queue);
        
        //7.监听消息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage=(TextMessage)message;
                try {
                    System.out.println("接收到消息:"+textMessage.getText());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });    
        //8.等待键盘输入
        System.in.read();    
        //9.关闭资源
        consumer.close();
        session.close();
        connection.close();    

        执行后看到控制台输出:欢迎来到神奇的ActiveMq世界

        1.3 运行测试

        同时开启2个以上的消费者,再次运行生产者,观察每个消费者控制台的输出,会发现只有一个消费者会接收到消息。

      2.发布订阅模式

        2.1消息生产者

        创建类TopicProducer ,main方法代码如下:

      

    //1.创建连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.56.102:61616");
        //2.获取连接
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建主题对象
        Topic topic = session.createTopic("test-topic");
        //6.创建消息生产者
        MessageProducer producer = session.createProducer(topic);
        //7.创建消息
        TextMessage textMessage = session.createTextMessage("欢迎来到神奇的ActiveMq世界");
        //8.发送消息
        producer.send(textMessage);
        //9.关闭资源
        producer.close();
        session.close();
        connection.close();

        2.2消息消费者

        创建类TopicConsumer ,main方法代码如下:

        

        //1.创建连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.56.102:61616");
        //2.获取连接
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建主题对象
        //Queue queue = session.createQueue("test-queue");
        Topic topic = session.createTopic("test-topic");
        //6.创建消息消费
        MessageConsumer consumer = session.createConsumer(topic);
        
        //7.监听消息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage=(TextMessage)message;
                try {
                    System.out.println("接收到消息:"+textMessage.getText());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        //8.等待键盘输入
        System.in.read();
        //9.关闭资源
        consumer.close();
        session.close();
        connection.close();    

        2.3 运行测试

          同时开启2个以上的消费者,再次运行生产者,观察每个消费者控制台的输出,会发现每个消费者会接收到消息。 

  • 相关阅读:
    centos7.7环境下编译安装tengine2.3.2版本
    centos6.9安装python3.6.9独立的virtualenv环境,并且能正确引入ssl
    django在centos生产环境的部署
    django入门8之xadmin引入富文本和excel插件
    jenkins服务器使用python脚本rabbitmqadmin和shell对目标服务器进行管理
    django入门7之django template和xadmin常用技巧
    mysql5.7同步复制报错1060故障处理
    Centos7.6使用yum安装PHP7.2
    django中安全sql注入等
    django入门6引入验证码插件 django-simple-captcha
  • 原文地址:https://www.cnblogs.com/cracker13/p/9964758.html
Copyright © 2011-2022 走看看