zoukankan      html  css  js  c++  java
  • ActiveMQ学习笔记(18)----Message高级特性(二)

    1. Blob Message

      有些时候,我们需要传递Blob(Binary Large Objects)消息,在5.14之前,(5.12和5.13需要在jetty.xml中手动开启)可以按照如下的方式配置使用fileserver:

      配置BLOB Tansfer Policy,可以在发送方的连接URI上设置,如:

    tcp://localhost:61616?jms.blobTransferPolicy.uploadUrl=http://localhost:8161/fileserver

      在5.14之后,就只能通过使用ftp协议来发送blobmessage,或自己将文件传到某个服务器上(通过FTP或其他方式),而后将该文件的url放在BlobMessage中再发送这条BlobMessage。不过,5.15好像又提供了http方式,不过需要自己实现文件上传服务器,web服务器上传代码可以参考

      http://svn.apache.org/repos/asf/activemq/trunk/activemq-fileserver/中三个类的的实现方式。 

      消费者的消费不受影响。

      代码实现方式如下:

     
    package com.wangx.activemq.queue;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.ActiveMQSession;
    import org.apache.activemq.BlobMessage;
    
    import javax.jms.*;
    import java.io.File;
    
    public class MessageSender {
    
        public static void main(String[] args) throws JMSException {
            //创建链接工厂
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            Connection connection = null;
            ActiveMQSession session = null;
            try{
                //创建链接
                connection =  factory.createConnection();
                //启动链接
                connection.start();
                //获取会话
                session = (ActiveMQSession) connection.createSession(Boolean.TRUE, session.AUTO_ACKNOWLEDGE);
                //创建队列
                Destination queue = session.createQueue("myQueue");
                //创建生产者对象
                MessageProducer messageProducer = session.createProducer(queue);
                //创建blob消息
                BlobMessage blobMessage = session.createBlobMessage(new File("pom.xml"));
                messageProducer.send(blobMessage);
                session.commit();
                session.close();
                connection.close();
            }catch (Exception e) {
                e.printStackTrace();
            }finally {
            }
    
        }
    }
     

      此时使用的是默认的文件上传服务器地址,地址为:

      http://localhost:8080/uploads/

      如果需要自定义地址,可以在uri上添加

    tcp://localhost:61616?jms.blobTransferPolicy.uploadUrl=http://foo.com/

      或使用:

      BlobTransferPolicy构建策略,并通过factory.setBlobTransferPolicy();设置策略。

      接下来消费者使用方式:

     
    package com.wangx.activemq.queue;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.BlobMessage;
    
    import javax.jms.*;
    import java.io.IOException;
    import java.io.InputStream;
    
    public class MessageReceive {
    
        public static void main(String[] args) {
            //创建链接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            Connection connection = null;
            Session session = null;
            try{
                //创建链接
                connection =  factory.createConnection();
                //启动链接
                connection.start();
                //获取会话
                session = connection.createSession(Boolean.TRUE, session.AUTO_ACKNOWLEDGE);
                //创建队列
                Destination queue = session.createQueue("myQueue");
                //创建消费者
                MessageConsumer messageConsumer = session.createConsumer(queue);
                //监听消息
               messageConsumer.setMessageListener(new MessageListener() {
                   @Override
                   public void onMessage(Message message) {
                       if (message instanceof BlobMessage) {
                           //监听BlobMessage
                           BlobMessage blobMessage = (BlobMessage) message;
                           try {
                               InputStream in = blobMessage.getInputStream();
                               byte[] bytes = new byte[in.available()];
                               System.out.println(new String(bytes));
                           } catch (Exception e) {
                               e.printStackTrace();
                           }
    
                           // process the stream...
                       }
                   }
               });
                session.commit();
                session.close();
                connection.close();
            }catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
     

      如果你发送到的文件或者URL存在,比如发给共享文件系统或者是WebServer上的web应用,那么你可以使用如下方式:

    BlobMessage message = session.createBlobMessage(new URL("http://some.shared.site.com");
    producer.send(message);

    2. Message Transformation

      有时候需要JMS Producer内部进行message转换,从4.2版本起,ActiveMQ提供了一个Message Transform接口用于进行消息转换,可以在如下对象上调用:

      ActiveMQConnectionFactory,ActiveMQConnection,ActiveMQSession,ActiveMQMessageConsumer,ActiveMQMessageProducer.

      在消息被发送之前发送到JMS producer的消息总线前进行转换,通过producerTransform方法,在消息到达总线后,但是在consumer接收消息之前进行转换,通过consumerTransform方法,当然MessageTransfoemer接口的实现需要你自己来提供。

      官方文档解释如下:

      

    原文 ActiveMQ学习笔记(18)----Message高级特性(二)

  • 相关阅读:
    Jenkins搭建
    Hexo搭建静态博客站点
    FactoryBean简介以及Mybatis-Spring应用
    ArrayList源码分析
    BCZM : 1.8
    BCZM : 1.7
    BCZM : 1.6
    BCZM : 1.5
    BCZM : 1.4
    BCZM : 1.3
  • 原文地址:https://www.cnblogs.com/xiaoshen666/p/10854715.html
Copyright © 2011-2022 走看看