zoukankan      html  css  js  c++  java
  • ActiveMQ的p2p模式与发布订阅模式

    1.消息中间件:采用异步通讯防止,支持点对点以及发布订阅模式,可以解决高并发问题
            传统调用接口,可能发生阻塞,重复提交,超时等等问题,可以利用消息中间件发送异步通讯请求
            
            点对点:生产者   消息队列    消费者
            发布订阅:生产者   主题    消费者1  消费者N
        
     2.windows安装ActiveMQ
            2.1 解压,进入apache-activemq-5.11.1inwin64
            2.2 启动,双击activeMQ.bat脚本启动,启动窗口不要关闭,可以设置后台启动
            2.3 启动完成后,如果发送消息或者消费消息通过61616端口进行         后台查看信息通过8161端口查看
            2.4 进入后台登陆:默认用户名和密码都是admin

     

    P2P (点对点)

    P2P

    1. P2P模式图

      

      2. 涉及到的概念

      1. 消息队列(Queue)
      2. 发送者(Sender)
      3. 接收者(Receiver)
      4. 每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

      3. P2P的特点

      1. 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)
      2. 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
      3. 接收者在成功接收消息之后需向队列应答成功

    如果你希望发送的每个消息都应该被成功处理的话,那么你需要P2P模式。

    应用场景

    A用户与B用户发送消息

    案例:

    生产者:

    public static void main(String[] args) throws JMSException {
                    //步骤一:创建连接工厂
                    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
                    //步骤二:创建连接
                    Connection connection = activeMQConnectionFactory.createConnection();
                    //步骤三:启动连接
                    connection.start();
                    //步骤四:获取会话工厂
                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                    //步骤五:创建队列
                    Queue queue = session.createQueue("wdksoft_queue");
                    //创建消息生产者
                    MessageProducer producer = session.createProducer(queue);
                    //消息持久化
                    producer.setDeliveryMode(2);
                    //模拟消息
                    TextMessage textMessage = session.createTextMessage("hello activeMQ");
                    //发送消息
                    producer.send(textMessage);
                    System.out.println("生产者生产消息完毕~");
                    //回收资源
                    session.close();
                    connection.close();
                }

    消费者:

    public static void main(String[] args) throws JMSException {
                    //步骤一:创建连接工厂
                    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
                    //步骤二:创建连接
                    Connection connection = activeMQConnectionFactory.createConnection();
                    //步骤三:开启连接
                    connection.start();
                    //创建会话对象
                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                    //获取到接受消息的队列
                    Queue queue = session.createQueue("wdksoft_queue");
                    //创建消费者
                    MessageConsumer consumer = session.createConsumer(queue);
                    while(true){
                        //获取消息
                        TextMessage message = (TextMessage)consumer.receive();
                        if(message!=null){
                            System.out.println("消费者获取消息:"+message.getText());
                        }else{
                            break;
                        }
                    }
                    //回收资源
                    session.close();
                    connection.close();
    
                }

    在没有消费者服务启动时,生产者生产的消息会被暂存到消息队列中,等到有消费者服务启动时,就从消息队列中取走消息

    点对点的方式,生产者生产的消息只能被一个消费者获取

    当有多个消费者存在时,只有一个消费者会收到消息

    如果想一个生产者发布了消息,多个消费者都能接收到,需要用到另一种模式,发布与订阅模式

    Pub/Sub (发布与订阅)

    Pub/Sub模式图 

    涉及到的概念 

    主题(Topic)

    发布者(Publisher)

    订阅者(Subscriber) 
    客户端将消息发送到主题。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。

    Pub/Sub的特点

    每个消息可以有多个消费者

    发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。

    为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

    如果你希望发送的消息可以不被做任何处理、或者被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型

    消息的消费 
    在JMS中,消息的产生和消息是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。 
    ○ 同步 
    订阅者或接收者调用receive方法来接收消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞 
    ○ 异步 
    订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。

      应用场景:

       用户注册、订单修改库存、日志存储

       画图演示

     

     

     

    案例:

    消费者:

    public static void main(String[] args) throws JMSException {
                    //步骤一:创建连接工厂
                    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
                    //步骤二:创建连接
                    Connection connection = activeMQConnectionFactory.createConnection();
                    //步骤三:开启连接
                    connection.start();
                    //创建会话对象
                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                    //获取到接受消息的队列
                    Topic topic = session.createTopic("wdksoft_topic");
                    //创建消费者
                    MessageConsumer consumer = session.createConsumer(topic);
                    while(true){
                        //获取消息
                        TextMessage message = (TextMessage)consumer.receive();
                        if(message!=null){
                            System.out.println("消费者获取消息:"+message.getText());
                        }else{
                            break;
                        }
                    }
                    //回收资源
                    session.close();
                    connection.close();
                }

    生产者:

    public static void main(String[] args) throws JMSException {
                    //步骤一:创建连接工厂
                    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
                    //步骤二:创建连接
                    Connection connection = activeMQConnectionFactory.createConnection();
                    //步骤三:启动连接
                    connection.start();
                    //步骤四:获取会话工厂
                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                    //步骤五:创建主题
                    Topic topic = session.createTopic("wdksoft_topic");
                    //创建消息生产者
                    MessageProducer producer = session.createProducer(null);
                    //消息持久化
                    producer.setDeliveryMode(2);
                    //模拟消息
                    TextMessage textMessage = session.createTextMessage("hello activeMQ pub");
                    //发送消息
                    producer.send(topic,textMessage);
                    System.out.println("生产者生产消息完毕~");
                    //回收资源
                    session.close();
                    connection.close();
                }

    使用发布与订阅模式,必须先启动消费者,确定了有哪些订阅,生产者在向这些消费者发布消息

     

     

  • 相关阅读:
    【LuoguP4770】[NOI2018] 你的名字
    【LuoguP5171】Earthquake
    【LuoguP3747】[六省联考2017] 相逢是问候
    【LuoguP4916】魔力环
    YOLO2:实时目标检测视频教程,视频演示, Android Demo ,开源教学项目,论文。
    谷歌发布 TensorFlow Lite [官方网站,文档]
    Chinese-Text-Classification,用卷积神经网络基于 Tensorflow 实现的中文文本分类。
    Chinese-Text-Classification:Tensorflow CNN 模型实现的中文文本分类器[不分词版]
    Hinton's paper Dynamic Routing Between Capsules 的 Tensorflow , Keras ,Pytorch实现
    谷歌开发者:看可口可乐公司是怎么玩转TensorFlow的?
  • 原文地址:https://www.cnblogs.com/chx9832/p/12304199.html
Copyright © 2011-2022 走看看