zoukankan      html  css  js  c++  java
  • ActiveMQ

    1、什么是消息中间件

    消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)

    执行过程长,不需要返回结果的服务可以用消息中间件。

    常见的消息中间件产品:

    1ActiveMQ

    ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。我们在本次课程中介绍 ActiveMQ的使用。中小企业使用较多。

    (2)RabbitMQ

    AMQP协议的领导实现,支持多种场景。淘宝的MySQL集群内部有使用它进行通讯,OpenStack开源云平台的通信组件,最先在金融行业得到运用。适合大型企业。

    (3)ZeroMQ

    史上最快的消息队列系统

    (4)Kafka

    Apache下的一个子项目 。特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统。适合处理海量数据。

    2、 JMS简介

    2.1 什么是JMS

    JMS(Java Messaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。

           JMS本身只定义了一系列的接口规范,是一种与厂商无关的 API,用来访问消息收发系统。它类似于 JDBC(java Database Connectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商目前都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,这只是几个例子。 JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JML 客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。

    JMS 定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一

    些不同形式的数据,提供现有消息格式的一些级别的兼容性。

    · TextMessage--一个字符串对象

    · MapMessage--一套名称-值对

    · ObjectMessage--一个序列化的 Java 对象

    · BytesMessage--一个字节的数据流

    · StreamMessage -- Java 原始值的数据流

    2.2  JMS消息传递类型

    对于消息的传递有两种类型:

    一种是点对点的,即一个生产者和一个消费者一一对应;

     

    另一种是发布/ 订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进

    行接收。

     

    3、ActiveMQ下载与安装

    其实这是一个解压即用的项目

    3.1 下载

    官方网站下载:http://activemq.apache.org/

    3.2安装(Linux)

    (1)将apache-activemq-5.12.0-bin.tar.gz 上传至服务器

    (2)解压此文件

    tar  zxvf  apache-activemq-5.12.0-bin.tar.gz

    (3)为apache-activemq-5.12.0目录赋权

    chmod 777 apache-activemq-5.12.0

    (4)进入apache-activemq-5.12.0in目录

    (5)赋与执行权限

    chmod 755 activemq 
    --------------------------------------  知识点小贴士    --------------------------
    linux 命令chmod 755的意思
    chmod是Linux下设置文件权限的命令,后面的数字表示不同用户或用户组的权限。
    一般是三个数字:
    第一个数字表示文件所有者的权限
    第二个数字表示与文件所有者同属一个用户组的其他用户的权限
    第三个数字表示其它用户组的权限。
          权限分为三种:读(r=4),写(w=2),执行(x=1) 。 综合起来还有可读可执行(rx=5=4+1)、可读可写(rw=6=4+2)、可读可写可执行(rwx=7=4+2+1)。
          所以,chmod 755 设置用户的权限为: 
    1.文件所有者可读可写可执行                                   --7
    2.与文件所有者同属一个用户组的其他用户可读可执行 --5 
    3.其它用户组可读可执行              

    3.3 启动

     ./activemq start

    出现下列提示表示成功!

     

    windows系统启动 需要进入/bin 目录下 选择 win64 或 win32 里面的 activemq.bat 启动。当然Windows下载的安装包不一样,请看官网。

    假设服务器地址为192.168.25.135 ,打开浏览器输入地址

    http://192.168.25.135:8161/ 即可进入ActiveMQ管理页面

     

    点击进入管理页面

     

    输入用户名和密码  均为 admin

     

    进入主界面

     

    点对点消息列表Qucues:

     

     发布/订阅消息列表Topics:

    列表各列信息含义如下:

    Number Of Pending Messages  等待消费的消息 这个是当前未出队列的数量。

    Number Of Consumers  消费者 这个是消费者端的消费者数量

    Messages Enqueued  进入队列的消息  进入队列的总数量,包括出队列的。

    Messages Dequeued  出了队列的消息  可以理解为是消费这消费掉的数量。

    Messages Enqueued = Messages Dequeued  + Number Of Pending Messages 

    4、 JMS入门小Demo

    4.1 点对点模式

    点对点的模式主要建立在一个队列上面,当连接一个列队的时候,发送端不需要知道接收端是否正在接收,可以直接向ActiveMQ发送消息,发送的消息,将会先进入队列中,如果有接收端在监听,则会发向接收端,如果没有接收端接收,则会保存在activemq服务器,直到接收端接收消息,点对点的消息模式可以有多个发送端,多个接收端,但是一条消息,只会被一个接收端给接收到,哪个接收端先连上ActiveMQ,则会先接收到,而后来的接收端则接收不到那条消息。

    消息生产者

    (1)创建工程jmsDemo ,引入依赖

      <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.13.4</version>
        </dependency>

    (2)创建类QueueProducer  main方法代码如下:

       //1.创建连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");
    
        //2.获取连接
        Connection connection = connectionFactory.createConnection();
    
        //3.启动连接
        connection.start();
    
        //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);       
    
        //5.创建队列对象
        Queue queue = session.createQueue("test-queue");
    
        //6.创建消息生产者
        MessageProducer producer = session.createProducer(queue);
    
        //7.创建消息
        TextMessage textMessage = session.createTextMessage("欢迎来到神奇的品优购世界");
    
        //8.发送消息
        producer.send(textMessage);
    
        //9.关闭资源
        producer.close();
        session.close();
        connection.close();

    上述代码中第4步创建session  的两个参数:

    第1个参数 是否使用事务

    第2个参数 消息的确认模式

    • AUTO_ACKNOWLEDGE = 1    自动确认
    • CLIENT_ACKNOWLEDGE = 2    客户端手动确认   
    • DUPS_OK_ACKNOWLEDGE = 3    自动批量确认
    • SESSION_TRANSACTED = 0    事务提交并确认

    运行后通过ActiveMQ管理界面查询

    消息消费者

    创建类QueueConsumer ,main方法代码如下:

       //1.创建连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");
    
        //2.获取连接
        Connection connection = connectionFactory.createConnection();
    
        //3.启动连接
        connection.start();
    
        //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
        //5.创建队列对象
        Queue queue = session.createQueue("test-queue");
    
        //6.创建消息消费
        MessageConsumer consumer = session.createConsumer(queue);
    
        //7.监听消息
        consumer.setMessageListener(new MessageListener() {
            publicvoid onMessage(Message message) {
                TextMessage textMessage=(TextMessage)message;
                try {
                    System.out.println("接收到消息:"+textMessage.getText());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
    
        //8.等待键盘输入
        System.in.read();  
    
        //9.关闭资源
        consumer.close();
        session.close();
        connection.close();

     执行后看到控制台输出

    运行测试

    同时开启2个以上的消费者,再次运行生产者,观察每个消费者控制台的输出,会发现只有一个消费者会接收到消息。

    4.2  发布/订阅模式

    消息生产者

    创建类TopicProducer ,main方法代码如下:

    
    

      //1.创建连接工厂

        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");

        //2.获取连接

        Connection connection = connectionFactory.createConnection();

        //3.启动连接

        connection.start();

        //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //5.创建主题对象

        Topic topic = session.createTopic("test-topic");

        //6.创建消息生产者

        MessageProducer producer = session.createProducer(topic);

        //7.创建消息

        TextMessage textMessage = session.createTextMessage("欢迎来到神奇的品优购世界");

        //8.发送消息

        producer.send(textMessage);

        //9.关闭资源

        producer.close();

        session.close();

      connection.close();

     运行效果如下:

    消息消费者

    创建类TopicConsumer ,main方法代码如下:

    //1.创建连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");
        //2.获取连接
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建主题对象
        //Queue queue = session.createQueue("test-queue");
        Topic topic = session.createTopic("test-topic");
        //6.创建消息消费
        MessageConsumer consumer = session.createConsumer(topic);
        
        //7.监听消息
        consumer.setMessageListener(new MessageListener() {
            publicvoid onMessage(Message message) {
                TextMessage textMessage=(TextMessage)message;
                try {
                    System.out.println("接收到消息:"+textMessage.getText());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        //8.等待键盘输入
        System.in.read();
        //9.关闭资源
        consumer.close();
        session.close();
        connection.close();    

    运行测试

    同时开启2个以上的消费者,再次运行生产者,观察每个消费者控制台的输出,会发现每个消费者会接收到消息。

  • 相关阅读:
    POJ 1469 COURSES 二分图最大匹配
    POJ 1325 Machine Schedule 二分图最大匹配
    USACO Humble Numbers DP?
    SGU 194 Reactor Cooling 带容量上下限制的网络流
    POJ 3084 Panic Room 求最小割
    ZOJ 2587 Unique Attack 判断最小割是否唯一
    Poj 1815 Friendship 枚举+求最小割
    POJ 3308 Paratroopers 最小点权覆盖 求最小割
    1227. Rally Championship
    Etaoin Shrdlu
  • 原文地址:https://www.cnblogs.com/zeussbook/p/11226158.html
Copyright © 2011-2022 走看看