zoukankan      html  css  js  c++  java
  • Java消息服务-JMS

    什么是JMS?

    JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

    Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

    消息服务:

    消息服务指的是两个应用程序之间进行异步通信的API,它为标准消息协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持应用程序开发。在Java中,当两个应用程序使用JMS进行通信时,它们之间并不是直接相连的,而是通过一个共同的消息收发服务连接起来,可以达到解耦的效果。

     

    体系架构:

    JMS由以下元素组成:
     
    JMS提供者
    连接面向消息中间件的,JMS接口的一个实现。提供者可以是Java平台的JMS实现,也可以是非Java平台的面向消息中间件的适配器。
     
    JMS客户
    生产或消费基于消息的Java的应用程序或对象。
     
    JMS生产者
    创建并发送消息的JMS客户。
     
    JMS消费者
    接收消息的JMS客户。
     
    JMS消息
    包括可以在JMS客户之间传递的数据的对象
     
    JMS队列
    一个容纳那些被发送的等待阅读的消息的区域。与队列名字所暗示的意思不同,消息的接受顺序并不一定要与消息的发送顺序相同。一旦一个消息被阅读,该消息将被从队列中移走。
     
    JMS主题
    一种支持发送消息给多个订阅者的机制。

    JMS对象模型

    ConnectionFactory: 创建Connection对象的工厂,针对两种不同的JMS消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。

    Connection: Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。

    Session: Session是操作消息的接口。可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。

    MessageProducer: 消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。

    MessageConsumer: 消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。

    Destination: Destination 的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。

    JMS的两种消息类型

    在JMS标准中,有两种消息模型PTP(Point to Point),Publish/Subscribe(Pub/Sub)。

    P2P模式-点对点消息传送模型:

    在点对点消息传送模型中,应用程序由消息队列,发送者,接收者组成。每一个消息发送给一个特殊的消息队列,该队列保存了所有发送给它的消息(除了被接收者消费掉的和过期的消息)。

    P2P的特点

    1,每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)。

    2,发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列。

    3,接收者在成功接收消息之后需向队列发送确认收到通知(acknowledgement)。

    Pub/Sub-发布/订阅消息传递模型:

    在发布/订阅消息模型中,发布者发布一个消息,该消息通过topic传递给所有的客户端。在这种模型中,发布者和订阅者彼此不知道对方,是匿名的且可以动态发布和订阅topic。

    在发布/订阅消息模型中,目的地被称为主题(topic),topic主要用于保存和传递消息,且会一直保存消息直到消息被传递给客户端。


    Pub/Sub特点

    1,每个消息可以有多个消费者。

    2,发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个或多个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。

    3,为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

    消息接收

    在JMS中,消息的接收可以使用以下两种方式:

    同步  使用同步方式接收消息的话,消息订阅者调用receive()方法。在receive()中,消息未到达或在到达指定时间之前,方法会阻塞,直到消息可用。

    异步  使用异步方式接收消息的话,消息订阅者需注册一个消息监听者,类似于事件监听器,只要消息到达,JMS服务提供者会通过调用监听器的onMessage()递送消息。

    入门Demo

    pom.xml

    <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.13.4</version>
        </dependency>
    </dependencies>

    点对点模式:

    接收方

    public class QueueConsuer {
        public static void main(String[] args) throws Exception{
            //1创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
            //2获得连接
            Connection connection = activeMQConnectionFactory.createConnection();
            //3启动连接
            connection.start();
            //4创建session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5创建列表
            Queue queue = session.createQueue("test-queue");
            //6创建消息接收方
            MessageConsumer consumer = session.createConsumer(queue);
            //7监听消息
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("接收到的消息为:"+textMessage.getText());
                    }catch (JMSException e){
                        e.printStackTrace();
                    }
                }
            });
            //8等待键盘输入
            System.in.read();
            //9关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    }

    发送方

        public static void main(String[] args) throws Exception{
            //1创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
            //2获得连接
            Connection connection = activeMQConnectionFactory.createConnection();
            //3启动连接
            connection.start();
            //4创建session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5创建列表 指定发送队列的名称
            Queue queue = session.createQueue("test-queue");
            //6创建消息生产者
            MessageProducer producer = session.createProducer(queue);
            //7生产消息
            TextMessage testMessage = session.createTextMessage("这是我生产的消息 java0722");
            //8发送消息
            producer.send(testMessage);
            //9关闭资源
            producer.close();
            session.close();
            connection.close();
        }

    订阅发布模式

    接收方(多个接收方 代码相同)

        public static void main(String[] args) throws Exception{
            //1创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
            //2获得连接
            Connection connection = activeMQConnectionFactory.createConnection();
            //3启动连接
            connection.start();
            //4创建session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5创建列表
            Topic topic = session.createTopic("test-topic");
            //6创建消息接收方
            MessageConsumer consumer = session.createConsumer(topic);
            //7监听消息
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("接收到的消息为:"+textMessage.getText());
                    }catch (JMSException e){
                        e.printStackTrace();
                    }
                }
            });
            //8等待键盘输入
            System.in.read();
            //9关闭资源
            consumer.close();
            session.close();
            connection.close();
        }

    发送方

        public static void main(String[] args) throws Exception{
            //1创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
            //2获得连接
            Connection connection = activeMQConnectionFactory.createConnection();
            //3启动连接
            connection.start();
            //4创建session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5创建列表
            Topic topic = session.createTopic("test-topic");
            //6创建消息接收方
            MessageProducer producer = session.createProducer(topic);
            //7生产消息
            TextMessage testMessage = session.createTextMessage("这是我生产的消息 java0722 TTTTT");
            //8发送消息
            producer.send(testMessage);
            //9关闭资源
            producer.close();
            session.close();
            connection.close();
        }

    消息体

    为了适应不同场景下的消息,提高消息存储的灵活性,JMS定义了几种具体类型的消息,不同的子类型的消息体也不一样,需要注意的是,Message接口并没有提供一个统一的getBody之类的方法。消息子接口定义如下:


    TextMessage :最简单的消息接口,用于发送文本类的消息,设置/获取其body的方法定义如下setText()/getText()。

    StreamMessage :流式消息接口,里面定义了一系列的对基本类型的set/get方法,消息发送者可以通过这些方法写入基本类型的数据,消息接收者需要按发送者的写入顺序来读取相应的数据。

    MapMessage :把消息内容存储在Map里,本接口定义了一系列对基本类型的的set/get方法,与StreamMessage不同的是,每个值都对应了一个相应的key,所以消息接收者不必按顺序去读取数据。

    ObjectMessage :将对象作为消息的接口,提供了一个set/get 对象的方法,需要注意的是只能设置一个对象,这个对象可以是一个Collection,但必须是序列化的。

    BytesMessage :以字节的形式来传递消息的接口,除了提供了对基本类型的set/get,还提供了按字节方式进行set/get。

  • 相关阅读:
    iOS exit(0); 直接退出程序
    友盟推送简单调用
    KxMenu下拉菜单
    打开相册另类写法
    简洁调用字号
    十六进制颜色宏
    Swift定义单例
    不要在初始化方法和dealloc方法中使用Accessor Methods
    copyin函数
    c语言中的赋值
  • 原文地址:https://www.cnblogs.com/hank-hush/p/12141444.html
Copyright © 2011-2022 走看看