zoukankan      html  css  js  c++  java
  • activemq demo指南

       queue与topic的技术特点对比
     

    topic

    queue

    概要

    Publish Subscribe messaging 发布订阅消息

    Point-to-Point 点对点

    有无状态

    topic数据默认不落地,是无状态的。

    Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOMEdatakr-storedata下面。也可以配置成DB存储。

    完整性保障

    并不保证publisher发布的每条数据,Subscriber都能接受到。

    Queue保证每条数据都能被receiver接收。

    消息是否会丢失

    一般来说publisher发布消息到某一个topic时,只有正在监听该topic地址的sub能够接收到消息;如果没有sub在监听,该topic就丢失了。

    Sender发送消息到目标Queue,receiver可以异步接收这个Queue上的消息。Queue上的消息如果暂时没有receiver来取,也不会丢失。

    消息发布接收策略

    一对多的消息发布接收策略,监听同一个topic地址的多个sub都能收到publisher发送的消息。Sub接收完通知mq服务器

    一对一的消息发布接收策略,一个sender发送的消息,只能有一个receiver接收。receiver接收完后,通知mq服务器已接收,mq服务器对queue里的消息采取删除或其他操作。

              Topic和queue的最大区别在于topic是以广播的形式,通知所有在线监听的客户端有新的消息,没有监听的客户端将收不到消息;而queue则是以点对点的形式通知多个处于监听状态的客户端中的一个。

    queue 通讯:

    消息提供者:

    	 public static void main(String[] args) throws JMSException { 
             // ConnectionFactory :连接工厂,JMS 用它创建连接 
             ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( 
                             ActiveMQConnection.DEFAULT_USER, 
                             ActiveMQConnection.DEFAULT_PASSWORD, 
                             "tcp://localhost:61616"); 
             //JMS 客户端到JMS Provider 的连接 
             Connection connection = connectionFactory.createConnection(); 
             connection.start(); 
             // Session: 一个发送或接收消息的线程 
             Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); 
             // Destination :消息的目的地;消息发送给谁. 
             // 获取session注意参数值my-queue是Query的名字 
             Destination destination = session.createQueue("my-queue"); 
             //Destination destination =  session.createTopic("my-topic");
             // MessageProducer:消息生产者 
             MessageProducer producer = session.createProducer(destination); 
             //设置不持久化 
             producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 
             //发送一条消息 
             sendMsg(session, producer); 
             session.commit(); 
             connection.close(); 
     } 
    
     public static void sendMsg(Session session, MessageProducer producer) throws JMSException { 
             //创建一条文本消息 
             TextMessage message = session.createTextMessage("Hello ActiveMQ!"); 
             //通过消息生产者发出消息 
             producer.send(message); 
             System.out.println(message.toString()); 
     } 
    

      消息接收者:

    public static void main(String[] args) {
            // ConnectionFactory :连接工厂,JMS 用它创建连接
            ConnectionFactory connectionFactory;
            // Connection :JMS 客户端到JMS Provider 的连接
            Connection connection = null;
            // Session: 一个发送或接收消息的线程
            Session session;
            // Destination :消息的目的地;消息发送给谁.
            Destination destination;
            // 消费者,消息接收者
            MessageConsumer consumer;
            connectionFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnection.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD,
                    "tcp://localhost:61616");
            try {
                // 构造从工厂得到连接对象
                connection = connectionFactory.createConnection();
                // 启动
                connection.start();
                // 获取操作连接
                session = connection.createSession(Boolean.FALSE,
                        Session.AUTO_ACKNOWLEDGE);
                // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
                destination =  session.createQueue("my-queue");
                consumer = session.createConsumer(destination);
                while (true) {
                    TextMessage message = (TextMessage) consumer.receive(1000);
                    if (null != message) {
                        System.out.println("收到消息" + message.getText());
                    } else {
                        break;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    if (null != connection)
                        connection.close();
                } catch (Throwable ignore) {
                }
            }
        }
    

      Topic通讯:

    广播提供者:

     private static final int SEND_NUMBER = 5;
        public static void sendMessage(Session session, MessageProducer producer)
                throws Exception {
            for (int i = 1; i <=SEND_NUMBER; i++) {
                TextMessage message = session
                        .createTextMessage("ActiveMq发送的消息" + i);
                //发送消息到目的地方
                System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
                producer.send(message);
            }
        }
       
        public static void main(String[] args) {
            // ConnectionFactory:连接工厂,JMS用它创建连接
            ConnectionFactory connectionFactory;
            // Connection:JMS客户端到JMS Provider的连接
            Connection connection = null;
            // Session:一个发送或接收消息的线程
            Session session;
            // Destination:消息的目的地;消息发送给谁.
            Destination destination;
            // MessageProducer:消息发送者
            MessageProducer producer;
            // TextMessage message;
            //构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
            connectionFactory = new ActiveMQConnectionFactory(
            		ActiveMQConnectionFactory.DEFAULT_USER,
            		ActiveMQConnectionFactory.DEFAULT_PASSWORD,ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL
                  );
            try {
                //构造从工厂得到连接对象
                connection = connectionFactory.createConnection();
                //启动
                connection.start();
                //获取操作连接
                session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
                //获取session注意参数值FirstTopic是一个服务器的topic(与queue消息的发送相比,这里是唯一的不同)
                destination = session.createTopic("FirstTopic");
                //得到消息生成者【发送者】
                producer = session.createProducer(destination);
                //设置不持久化,此处学习,实际根据项目决定
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                //构造消息,此处写死,项目就是参数,或者方法获取
                sendMessage(session, producer);
                session.commit();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    if (null != connection)
                        connection.close();
                } catch (Throwable ignore) {
                }
            }
        }
    

      广播接收者:

       private String threadName;
    
        ReceiveTopic(String threadName) {
             this.threadName = threadName;
        }
    
        public void run() {
             // ConnectionFactory:连接工厂,JMS用它创建连接
             ConnectionFactory connectionFactory;
             // Connection:JMS客户端到JMS Provider的连接
             Connection connection =null;
             // Session:一个发送或接收消息的线程
             Session session;
             // Destination:消息的目的地;消息发送给谁.
             Destination destination;
             //消费者,消息接收者
             MessageConsumer consumer;
             connectionFactory = new ActiveMQConnectionFactory(
            		 ActiveMQConnectionFactory.DEFAULT_USER,
             		ActiveMQConnectionFactory.DEFAULT_PASSWORD,ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
             try {
                   //构造从工厂得到连接对象
                   connection = connectionFactory.createConnection();
                   //启动
                   connection.start();
                   //获取操作连接,默认自动向服务器发送接收成功的响应
                   session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                   //获取session注意参数值FirstTopic是一个服务器的topic
                   destination = session.createTopic("FirstTopic");
                   consumer = session.createConsumer(destination);
                   while (true) {
                        //设置接收者接收消息的时间,为了便于测试,这里设定为100s
                        TextMessage message = (TextMessage) consumer
                                    .receive(1 * 1000);
                        if (null != message) {
                              System.out.println("线程"+threadName+"收到消息:" + message.getText());
                        } else {
                              continue;
                        }
                   }
             } catch (Exception e) {
                   e.printStackTrace();
             } finally {
                   try {
                        if (null != connection)
                              connection.close();
                   } catch (Throwable ignore) {
                   }
             }
        }
    
        public static void main(String[] args) {
              //这里启动3个线程来监听FirstTopic的消息,与queue的方式不一样三个线程都能收到同样的消息
             ReceiveTopic receive1=new ReceiveTopic("thread1");
             ReceiveTopic receive2=new ReceiveTopic("thread2");
             ReceiveTopic receive3=new ReceiveTopic("thread3");
             Thread thread1=new Thread(receive1);
             Thread thread2=new Thread(receive2);
             Thread thread3=new Thread(receive3);
             thread1.start();
             thread2.start();
             thread3.start();
        }
    

      如果传输中文:可以设置字符串的编码格式new String(msg.getBytes("utf-8"),"ISO-8859-1");  new String(message.toString().getBytes("ISO-8859-1"), "UTF-8");

  • 相关阅读:
    一月5日
    一月5日
    面试java工程师的自我介绍(模拟篇)
    Spring Boot Jpa 介绍
    spring+redis做缓存使用
    Spring Boot Web 开发详解
    thymeleaf+spring的简单搭建
    如何搭建 Spring boot
    通用baseDao
    pagehelper的使用
  • 原文地址:https://www.cnblogs.com/xiaoblog/p/4816636.html
Copyright © 2011-2022 走看看