zoukankan      html  css  js  c++  java
  • ActiveMQ

    ActiveMQ

       MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。 
    特点: 
      1、支持多种语言编写客户端 
      2、对spring的支持,很容易和spring整合 
      3、支持多种传输协议:TCP,SSL,NIO,UDP等 
      4、支持AJAX 

      消息形式: 
      1、点对点(queue) 
      2、一对多(topic) 

    安装启动

       官网下载压缩包,解压到相应目录 , 这里解压到/opt

      启动mq , ./bin/activemq start

      关闭mq,   ./bin/activemq stop

      访问 :127.0.0.1:8161/admin/     账号admin   密码admin

      采用61616端口提供JMS服务,采用8161提供管理控制台服务

      通信协议:TCP 

    生产者生产消息

    package com.steak.activemq.test;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class mqTest {
    
      private static final StringACTIVE_URL ="tcp://127.0.0.1:61616";
    
        private static final StringQUEUE ="queue";
    
        public static void main(String[] args)throws JMSException {
    
    //创建连接工厂
    
            ActiveMQConnectionFactory activeMQConnectionFactory =new ActiveMQConnectionFactory(ACTIVE_URL);
    
            //通过连接工厂,获得连接
    
            Connection connection = activeMQConnectionFactory.createConnection();
    
            connection.start();
    
            //创建session
    
            Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
    
            //创建目的地
    
            Queue queue = session.createQueue(QUEUE);
    
            //创建消息的生产者
    
            MessageProducer messageProducer = session.createProducer(queue);
    
            //通过使用messageProducer生产消息发送到MQ队列里
    
            for (int i =0 ; i <3 ; i++){
    
    //创建消息
    
                TextMessage textMessage = session.createTextMessage("消息 "+i);
    
                //通过messageProducer发送消息
    
                messageProducer.send(textMessage);
    
            }
    
    //关闭资源
    
            messageProducer.close();
    
            session.close();
    
            connection.close();
    
            System.out.println("消息发送完成");
    
        }
    
    }

      启动消费者发送消息,此时等待消费的消息为3条,消费者为0个,进队的消息为3条,出队消息为0条

        Number Of Pending Messages:等待消费的消息

      Number of Conumers : 消费者数量

      Message Enqueued : 进队消息数

      Message Dequeued :    出队消息数

    消费者消费消息

    package com.steak.activemq.test;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class Consumer {
    
    private static final StringACTIVE_URL ="tcp://127.0.0.1:61616";
    
        private static final StringQUEUE ="queue";
    
        public static void main(String[] args)throws JMSException {
    
    //创建连接工厂
    
            ActiveMQConnectionFactory activeMQConnectionFactory =new ActiveMQConnectionFactory(ACTIVE_URL);
    
            //通过连接工厂,获得连接
    
            Connection connection = activeMQConnectionFactory.createConnection();
    
            connection.start();
    
            //创建session
    
            Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
    
            //创建目的地
    
            Queue queue = session.createQueue(QUEUE);
    
            //创建消费者
    
            MessageConsumer messageConsumer = session.createConsumer(queue);
    
            //通过同步阻塞接受消息
    
            while (true){
    
    //接受消息的类型要和发送消息的一样 ,
    
                TextMessage textMessage = (TextMessage) messageConsumer.receive();
    
                if (null != textMessage){
    
    System.out.println("消息 "+textMessage.getText());
    
                }else {
    
    break;
    
                }
    
    }
    
    messageConsumer.close();
    
            session.close();
    
            connection.close();
    
        }
    
    }

      此时消费者有一个,并且消费了3条消息

      消费者接收消息时可以一直等待(耗费系统资源),也可以设置时间,

      当为receive()(同步阻塞)时,消费者一直阻塞等待消费消息

        为recevice(long varl)(异步阻塞)时,当超时后,消费者就消失

        通过异步监听方式来接收消息

    //通过异步监听方式来接收消息
    
        messageConsumer.setMessageListener(new MessageListener() {
    
    @Override
    
            public void onMessage(Message message) {
    
    if (null != message && messageinstanceof TextMessage){
    
    TextMessage textMessage = (TextMessage) message;
    
                    try {
    
    System.out.println("消息  "+((TextMessage) message).getText());
    
                    }catch (JMSException e) {
    
    e.printStackTrace();
    
                    }
    
    }
    
    }
    
    });
    
        System.in.read(); //保证控制台不关闭,作用是消费完消息以后才能关闭
    
        messageConsumer.close();
    
        session.close();
    
        connection.close();
    
    }

        

    消费者消费情况

             1,先启动生产者,再启动多个消费者,一定是第一个消费者消费完所有消息,后面的都消费不到

             2,先启动多个消费者,在启动生产者,消息基本是平均消费的,消息个数是基数的时候也是一个的差异

     

    主体模式

      当为(主题模式)Topic时,所有消费者收到的消息都是一样的,前提是要先订阅,订阅后才能收到消息,消息是无状态的,发送消息后就什么都不管了

      主题模式

     

       如:两人订阅

       生产者发布消息,此时发布了三条消息,但是为主题模式,有两个消费者,所以一共消费6条

    生命不止,折腾不息
  • 相关阅读:
    文件上传利用总结
    通过WebGoat学习java反序列化漏洞
    C# Bat批处理文件创建、执行
    C# 删除目录下所有文件
    是时候做一点属于自己的东西了
    2021.09.26省市县三级联动最新数据库,附脚本
    SpringBoot 整合Easy Poi 下载Excel(标题带批注)、导出Excel(带图片)、导入Excel(校验参数,批注导出),附案例源码
    NeRF 核心思想简记
    R-CNN系列核心思想简单记录
    HeapDump性能社区Young GC异常问题排查实战案例精选合集
  • 原文地址:https://www.cnblogs.com/steakliu/p/11589762.html
Copyright © 2011-2022 走看看