zoukankan      html  css  js  c++  java
  • Message高级特性 & 内嵌Jetty实现文件服务器

    1. Messaage Properties  常见属性

      更多的属性以及介绍参考:http://activemq.apache.org/activemq-message-properties.html

      消息属性,这个在之前刚学习ActiveMQ的时候已经介绍过,常见的如下:

      1. queue消息默认是持久化
      2. 消息得优先级默认是4.
      3. 消息发送时设置了时间戳。
      4. 消息的过期时间默认是永不过期,过期的消息进入DLQ,可以配置DLQ及其处理策略。
      5. 如果消息是重发的,将会被标记出来。
      6. JMSReplyTo标识响应消息发送到哪个queue.
      7. JMSCorelationID标识此消息相关联的消息id,可以用这个标识把多个消息连接起来。
      8. JMS同时也记录了消息重发的次数。默认是6次
      9. 如果有一组相关联的消息需要处理,可以分组;只需要设置消息组的名字和这个消息的第几个消息。
      10. 如果消息中一个事务环境,则TXID将会被设置。
      11. 此外ActiveMQ在服务器端额外设置了消息入队和出队的时间戳。
      12. ActiveMQ里消息属性的值,不仅可以用基本类型,还可以用List或Map类型

    2. Advisory Message  监听ActiveMQ自己的消息

      Advisory Message是ActiveMQ自身的系统消息地址,可以监听该地址来获取activemq的系统消息。目前支持获取如下信息:

    consumers, producers和connections的启动和停止
    创建和销毁temporary destinations
    opics 和queues 的消息过期
    brokers发送消息给destination,但是没有consumers
    connections启动和停止

    说明:

    1. 所有advisory的topic,前缀是:ActiveMQ.Advisory
    2. 所有Advisory的消息类型是:‘Advisory’,所有的Advisory都有的消息属性有:originBrokerId,originBrokerName,originBrokerURL
    3. 具体支持的topic和queue,请参照:
       http://activemq.apache.org/advisory-message.html
    Advisory功能默认是关闭的,打开Advisorie的具体实现如下:

        <broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerName" dataDirectory="${activemq.data}" schedulePeriodForDestinationPurge="1000">
            <destinationPolicy>
                  <policyMap>
                    <policyEntries>
                         <policyEntry queue=">"  advisoryForConsumed="true" />
                    </policyEntries>
                  </policyMap>
           </destinationPolicy>
        ...
    </broker>

    配置启动之后我们向主题chatTopic发送一条消息可以查看到如下activemq增加的主题:

     我们订阅上面主题:

    package cn.qlq.activemq.topic;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.Topic;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.command.ActiveMQMessage;
    
    /**
     * 主题模式的消费消息
     * 
     * @author QiaoLiQiang
     * @time 2018年9月18日下午11:26:41
     */
    public class MsgConsumer {
    
        // 默认端口61616
        private static final String url = "tcp://localhost:61616/";
        private static final String topicName = "ActiveMQ.Advisory.Producer.Topic.chatTopic";
    
        public static void main(String[] args) throws JMSException {
            // 1创建ConnectionFactory
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
            // 2.由connectionFactory创建connection
            Connection connection = connectionFactory.createConnection();
            // 设置链接的ID
            // 3.创建Session===第一个参数是是否事务管理,第二个参数是应答模式
            Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 4.创建Destination(Queue继承Queue,Topic也继承Destination==这三个都是接口)
            Topic destination = session.createTopic(topicName);
            // 创建TopicSubscriber来订阅;需要在连接上设置消费者id,用来识别消费者;设置好了过后再start 这个 connection
            // 5.启动connection
            connection.start();
            // 6.创建消费者consumer
            MessageConsumer consumer = session.createConsumer(destination);
            Message message = consumer.receive();
            while (message != null) {
                ActiveMQMessage txtMsg = (ActiveMQMessage) message;
                session.commit();
                System.out.println("收到消 息:" + txtMsg.getMessage());
                message = consumer.receive(1000L);
            }
            session.close();
            connection.close();
        }
    
    }

    第二种获取某一队列的ActiveMQ自身的生产者主题和消费者主题的方法如下:

         String topicName = "chatTopic";    
            Topic topic = session.createTopic(topicName);
            Destination destination = AdvisorySupport.getProducerAdvisoryTopic(topic);
            Destination destination2 = AdvisorySupport.getConsumerAdvisoryTopic(topic);

    再次发送消息发现该消费者接收到的消息如下:

    收到消 息:ActiveMQMessage {commandId = 0, responseRequired = false, messageId = ID:MicroWin10-1535-57537-1554965648551-1:1:0:0:16, originalDestination = null, originalTransactionId = null, producerId = ID:MicroWin10-1535-57537-1554965648551-1:1:0:0, destination = topic://ActiveMQ.Advisory.Producer.Topic.chatTopic, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 1554966377407, brokerOutTime = 1554966377410, correlationId = null, replyTo = null, persistent = false, type = Advisory, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@1324409e, dataStructure = ProducerInfo {commandId = 4, responseRequired = true, producerId = ID:MicroWin10-1535-59019-1554966377239-1:1:1:1, destination = topic://chatTopic, brokerPath = null, dispatchAsync = false, windowSize = 0, sentCount = 0}, redeliveryCounter = 0, size = 0, properties = {producerCount=1, originBrokerName=brokerName, originBrokerURL=tcp://MicroWin10-1535:61616, originBrokerId=ID:MicroWin10-1535-57537-1554965648551-0:1}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false}
    收到消 息:ActiveMQMessage {commandId = 0, responseRequired = false, messageId = ID:MicroWin10-1535-57537-1554965648551-1:1:0:0:17, originalDestination = null, originalTransactionId = null, producerId = ID:MicroWin10-1535-57537-1554965648551-1:1:0:0, destination = topic://ActiveMQ.Advisory.Producer.Topic.chatTopic, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 1554966377439, brokerOutTime = 1554966377439, correlationId = null, replyTo = null, persistent = false, type = Advisory, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@246ae04d, dataStructure = RemoveInfo {commandId = 0, responseRequired = true, objectId = ID:MicroWin10-1535-59019-1554966377239-1:1:1:1, lastDeliveredSequenceId = -2}, redeliveryCounter = 0, size = 0, properties = {producerCount=0, originBrokerName=brokerName, originBrokerURL=tcp://MicroWin10-1535:61616, originBrokerId=ID:MicroWin10-1535-57537-1554965648551-0:1}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false}

    总结Advisory的使用方式:
      1. 要在配置文件里面开启Advisories.
      2. 消息发送端没什么变化,不做多余改变或配置,
      3. 消息接收端:
        1)根据你要接收的消息类型,来设置不同的topic,可以直接从界面查询主题的name之后订阅,也可以借助AdvisorySupport类来获取
        2)由于这个topic默认不是持久化的,所有要先看起接收端,然后再发送消息。

        3) 接收消息的时候,接收到的消息类型是ActiveMQMessage,所以需要先转换成ActiveMQMessage再获取消息(也就是这是ActiveMQ特有的消息类型)

     3.延迟和定时消息传递(Delay and schedule Message Delivery)

       ActimeMQ也可以实现延时或者定时投递消息,类似于quartz定时任务等。

    一共四个属性
    AMQ_SCHEDULED_DELAY: 延迟投递的时间
    AMQ_SCHEDULED_PERIOD: 重复投递的时间间隔
    AMQ_SCHEDULED_REPEAT:重复投递次数
    AMQ_SCHEDULED_CRON: Cron表达式

    ActiveMQ也提供了一个封装的消息类型:org.apache.activemq.ScheduledMessage,可以使用这个类来辅助设置。

    (1)首先在broker上设置schedulerSupport="true"

    (2)程序上设置延迟以及定时效果,如下设置延迟30秒,重发3次,重发间隔是5秒的效果

                TextMessage tms = session.createTextMessage("textMessage:" + i);
                long time = 30 * 1000;
                tms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
                tms.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 3);
                tms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 5 * 1000);
                // 9.生产者发送消息
                producer.send(tms);

    消费者结果:

      消费者不会马上接收到消息,而是在30秒后第一次接受消息,并且间隔5秒后再次接受消息,总共会接收4次。

    (3)使用CRON表达式

                // 8.创建Message,有好多类型,这里用最简单的TextMessage
                TextMessage tms = session.createTextMessage("textMessage:" + i);
                tms.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON,"* * * * *");
                // 9.生产者发送消息
                producer.send(tms);

      CRON表达式的优先级高于另外三个参数,如果在设置了延时时间,也有repeat和period参数,则会在每次CRON执行的时候,重复投递repeat次,每次间隔period,就是说设置的是叠加效果,例如每小时都会发生消息被投递10次,延迟0秒开始,每次间隔1秒。

    4.Blob Message(传输大文件,一般大于100m)

      也就是发送文件消息。ActiveMQ传输文件的方式有byteMessage、StreamMessage、BlobMessage。其中bytemessage和streammessage如果不加工处理的话,只能传输小文件,小于100M的文件应该都可以传,blobmessage可以传输较大的文件。对于比较小的文件,简单的处理方式是先读取所有的文件成byte[],然后使用ByteMessage,把文件数据发送到broker,像正常的message一样处理。对于大文件,例如1GB以上的文件,这么搞直接把client或是broker给oom掉了。有些时候,我们需要传递Blob(Binary Large Objects)消息,在5.14之前,(5.12和5.13需要在jetty.xml中手动开启)可以按照如下的方式配置使用fileserver:

      配置BLOB Tansfer Policy,可以在发送方的连接URI上设置,如:

    tcp://localhost:61616?jms.blobTransferPolicy.uploadUrl=http://localhost:8161/fileserver

      在5.14之后,就只能通过使用ftp协议来发送blobmessage,或自己将文件传到某个服务器上(通过FTP或其他方式),而后将该文件的url放在BlobMessage中再发送这条BlobMessage。不过,5.15好像又提供了http方式,不过需要自己实现文件上传服务器。

      由于我使用的版本你是5.15的,所以我需要先搭建服务器。

    1.使用内嵌netty搭建文件服务器 (重要)

      也就是对文件进行管理,下面我的例子是使用jetty对G:/files/文件夹进行管理,会在浏览器检测此文件夹下面的文件,浏览器能解析的可以通过浏览器访问,不能解析(例如doc文件)的查看会下载。

      文件处理的类参考的下面三个类:   http://svn.apache.org/repos/asf/activemq/trunk/activemq-fileserver/  中三个类的的实现方式。

    pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>cn.qlq</groupId>
        <artifactId>FileServer</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>war</packaging>
    
        <dependencies>
            <dependency>
                <groupId>javax.servlet</groupId>
                <artifactId>servlet-api</artifactId>
                <version>2.5</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>javax.servlet</groupId>
                <artifactId>jsp-api</artifactId>
                <version>2.0</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.eclipse.jetty.aggregate</groupId>
                <artifactId>jetty-all-server</artifactId>
                <version>7.6.4.v20120524</version>
            </dependency>
            <!-- slf4j 依赖包 -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.25</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.5</version>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-api</artifactId>
                <version>2.0-rc1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-core</artifactId>
                <version>2.0-rc1</version>
            </dependency>
            <!-- 文件上传的测试包httpclient -->
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.3.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpmime</artifactId>
                <version>4.3.1</version>
            </dependency>
        </dependencies>
    
        <build>
            <!-- 配置了很多插件 -->
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                    <configuration>
                        <source>1.7</source>
                        <target>1.7</target>
                        <encoding>UTF-8</encoding>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    </project>

    服务器启动类:

    import org.eclipse.jetty.server.DispatcherType;
    import org.eclipse.jetty.server.Server;
    import org.eclipse.jetty.servlet.ServletContextHandler;
    import org.eclipse.jetty.servlet.ServletHolder;
    
    public class StartServer {
        public static void main(String[] args) throws Exception {
            Server server = new Server(8080);
    
            ServletContextHandler handler = new ServletContextHandler();
            // 相当于设置项目名称
            handler.setContextPath("/fileserver");
            // 设置资源文件所在目录,工具类中会以这个目录作为文件服务目录存储文件
            handler.setResourceBase("G:/files/");
            // handler.setResourceBase(".");
            System.out.println(handler.getServletContext().getRealPath("/"));
    
            handler.addFilter(FilenameGuardFilter.class, "/*", DispatcherType.FORWARD.ordinal());
    
            handler.addFilter(RestFilter.class, "/*", DispatcherType.FORWARD.ordinal());
            ServletHolder defaultServlet = new ServletHolder();
            defaultServlet.setName("DefaultServlet");
            defaultServlet.setClassName("org.eclipse.jetty.servlet.DefaultServlet");
    
            handler.addServlet(defaultServlet, "/*");
    
            server.setHandler(handler);
            server.start();
        }
    }

    重要的文件处理在下面的过滤器中:(PUT类型上传文件,请求method必须是PUT,GET请求资源,DELETE是删除文件,且请求类似于Restful风格,path最后一部分是文件的名称)

    import java.io.File;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.net.HttpURLConnection;
    import java.net.URL;
    import java.util.Enumeration;
    
    import javax.servlet.Filter;
    import javax.servlet.FilterChain;
    import javax.servlet.FilterConfig;
    import javax.servlet.ServletException;
    import javax.servlet.ServletRequest;
    import javax.servlet.ServletResponse;
    import javax.servlet.UnavailableException;
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class RestFilter implements Filter {
        private static final Logger LOG = LoggerFactory.getLogger(RestFilter.class);
    
        private static final String HTTP_HEADER_DESTINATION = "Destination";
        private static final String HTTP_METHOD_MOVE = "MOVE";
        private static final String HTTP_METHOD_PUT = "PUT";
        private static final String HTTP_METHOD_GET = "GET";
        private static final String HTTP_METHOD_DELETE = "DELETE";
    
        private String readPermissionRole;
        private String writePermissionRole;
        private FilterConfig filterConfig;
    
        public void init(FilterConfig filterConfig) throws UnavailableException {
            this.filterConfig = filterConfig;
            readPermissionRole = filterConfig.getInitParameter("read-permission-role");
            writePermissionRole = filterConfig.getInitParameter("write-permission-role");
        }
    
        private File locateFile(HttpServletRequest request) {
            return new File(filterConfig.getServletContext().getRealPath(request.getServletPath()), request.getPathInfo());
        }
    
        public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
                throws IOException, ServletException {
            if (!(request instanceof HttpServletRequest && response instanceof HttpServletResponse)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("request not HTTP, can not understand: " + request.toString());
                }
                chain.doFilter(request, response);
                return;
            }
    
            HttpServletRequest httpRequest = (HttpServletRequest) request;
            HttpServletResponse httpResponse = (HttpServletResponse) response;
    
            System.out.println(httpRequest.getRequestURL());
            System.out.println(httpRequest.getMethod());
            
            if (httpRequest.getMethod().equals(HTTP_METHOD_MOVE)) {
                doMove(httpRequest, httpResponse);
            } else if (httpRequest.getMethod().equals(HTTP_METHOD_PUT)) {
                doPut(httpRequest, httpResponse);
            } else if (httpRequest.getMethod().equals(HTTP_METHOD_GET)) {
                if (checkGet(httpRequest, httpResponse)) {
                    chain.doFilter(httpRequest, httpResponse); // actual processing
                                                                // done elsewhere
                }
            } else if (httpRequest.getMethod().equals(HTTP_METHOD_DELETE)) {
                doDelete(httpRequest, httpResponse);
            } else {
                chain.doFilter(httpRequest, httpResponse);
            }
        }
    
        protected void doMove(HttpServletRequest request, HttpServletResponse response)
                throws ServletException, IOException {
            if (LOG.isDebugEnabled()) {
                LOG.debug("RESTful file access: MOVE request for " + request.getRequestURI());
            }
    
            if (writePermissionRole != null && !request.isUserInRole(writePermissionRole)) {
                response.sendError(HttpURLConnection.HTTP_FORBIDDEN);
                return;
            }
    
            File file = locateFile(request);
            String destination = request.getHeader(HTTP_HEADER_DESTINATION);
    
            if (destination == null) {
                response.sendError(HttpURLConnection.HTTP_BAD_REQUEST, "Destination header not found");
                return;
            }
    
            try {
                URL destinationUrl = new URL(destination);
                IOHelper.copyFile(file, new File(destinationUrl.getFile()));
                IOHelper.deleteFile(file);
            } catch (IOException e) {
                response.sendError(HttpURLConnection.HTTP_INTERNAL_ERROR); // file
                                                                            // could
                                                                            // not
                                                                            // be
                                                                            // moved
                return;
            }
    
            response.setStatus(HttpURLConnection.HTTP_NO_CONTENT); // we return no
                                                                    // content
        }
    
        protected boolean checkGet(HttpServletRequest request, HttpServletResponse response)
                throws ServletException, IOException {
            if (LOG.isDebugEnabled()) {
                LOG.debug("RESTful file access: GET request for " + request.getRequestURI());
            }
    
            if (readPermissionRole != null && !request.isUserInRole(readPermissionRole)) {
                response.sendError(HttpURLConnection.HTTP_FORBIDDEN);
                return false;
            } else {
                return true;
            }
        }
    
        protected void doPut(HttpServletRequest request, HttpServletResponse response)
                throws ServletException, IOException {
            if (LOG.isDebugEnabled()) {
                LOG.debug("RESTful file access: PUT request for " + request.getRequestURI());
            }
    
            if (writePermissionRole != null && !request.isUserInRole(writePermissionRole)) {
                response.sendError(HttpURLConnection.HTTP_FORBIDDEN);
                return;
            }
    
            File file = locateFile(request);
    
            if (file.exists()) {
                boolean success = file.delete(); // replace file if it exists
                if (!success) {
                    response.sendError(HttpURLConnection.HTTP_INTERNAL_ERROR); // file
                                                                                // existed
                                                                                // and
                                                                                // could
                                                                                // not
                                                                                // be
                                                                                // deleted
                    return;
                }
            }
    
            FileOutputStream out = new FileOutputStream(file);
            try {
                IOHelper.copyInputStream(request.getInputStream(), out);
            } catch (IOException e) {
                LOG.warn("Exception occured", e);
                throw e;
            } finally {
                out.close();
            }
    
            response.setStatus(HttpURLConnection.HTTP_NO_CONTENT); // we return no
                                                                    // content
        }
    
        protected void doDelete(HttpServletRequest request, HttpServletResponse response)
                throws ServletException, IOException {
            if (LOG.isDebugEnabled()) {
                LOG.debug("RESTful file access: DELETE request for " + request.getRequestURI());
            }
    
            if (writePermissionRole != null && !request.isUserInRole(writePermissionRole)) {
                response.sendError(HttpURLConnection.HTTP_FORBIDDEN);
                return;
            }
    
            File file = locateFile(request);
    
            if (!file.exists()) {
                response.sendError(HttpURLConnection.HTTP_NOT_FOUND); // file not
                                                                        // found
                return;
            }
    
            boolean success = IOHelper.deleteFile(file); // actual delete operation
    
            if (success) {
                response.setStatus(HttpURLConnection.HTTP_NO_CONTENT); // we return
                                                                        // no
                                                                        // content
            } else {
                response.sendError(HttpURLConnection.HTTP_INTERNAL_ERROR); // could
                                                                            // not
                                                                            // be
                                                                            // deleted
                                                                            // due
                                                                            // to
                                                                            // internal
                                                                            // error
            }
        }
    
        public void destroy() {
            // nothing to destroy
        }
    }

    过滤器中处理servlet文件上传,参考:https://www.cnblogs.com/qlqwjy/p/8722267.html

    Httpclient上传文件代码如下:(PUT请求上传文件)

    package upload;
    
    import java.io.BufferedReader;
    import java.io.File;
    import java.io.IOException;
    import java.io.InputStreamReader;
    
    import org.apache.http.HttpEntity;
    import org.apache.http.client.ClientProtocolException;
    import org.apache.http.client.methods.CloseableHttpResponse;
    import org.apache.http.client.methods.HttpPut;
    import org.apache.http.entity.mime.HttpMultipartMode;
    import org.apache.http.entity.mime.MultipartEntityBuilder;
    import org.apache.http.entity.mime.content.FileBody;
    import org.apache.http.impl.client.CloseableHttpClient;
    import org.apache.http.impl.client.HttpClientBuilder;
    import org.apache.http.util.EntityUtils;
    
    /**
     * httpclient上传文件(测试没问题)
     * 
     * @author Administrator
     *
     */
    public class HttpClientUploadFile {
        public static void main(String[] args) throws ClassNotFoundException, ClientProtocolException, IOException {
            CloseableHttpClient httpclient = HttpClientBuilder.create().build();
            CloseableHttpResponse response = null;
            try {
                HttpPut httpput = new HttpPut(
                        "http://localhost:8080/fileserver/ID:MicroWin10-1535-54829-1554981858740-1:1:1:1:1");
                // 可以选择文件,也可以选择附加的参数
                HttpEntity req = MultipartEntityBuilder.create().setMode(HttpMultipartMode.BROWSER_COMPATIBLE)
                        .addPart("file", new FileBody(new File("G:/Exam.log")))// 上传文件,如果不需要上传文件注掉此行
                        .build();
                httpput.setEntity(req);
    
                System.out.println("executing request: " + httpput.getRequestLine());
                response = httpclient.execute(httpput);
    
                HttpEntity re = response.getEntity();
                System.out.println(response.getStatusLine());
                if (re != null) {
                    System.out.println(
                            "Response content: " + new BufferedReader(new InputStreamReader(re.getContent())).readLine());
                }
                EntityUtils.consume(re);
            } finally {
                try {
                    response.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
    }

    效果如下:

    git地址:https://github.com/qiao-zhi/JettyFileServer.git

    内嵌Jetty的用法参考:http://wiki.eclipse.org/Jetty/Tutorial/Embedding_Jetty

    2.生产者和消费者

     生产者

    package cn.qlq.activemq.blob;
    
    import java.io.File;
    
    import javax.jms.Connection;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.ActiveMQSession;
    import org.apache.activemq.BlobMessage;
    
    public class Producer {
    
        public static void main(String[] args) throws JMSException {
            // 创建链接工厂
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
                    "tcp://localhost:61616?jms.blobTransferPolicy.uploadUrl=http://localhost:8080/fileserver/");
            Connection connection = null;
            ActiveMQSession session = null;
            try {
                // 创建链接
                connection = factory.createConnection();
                // 启动链接
                connection.start();
                // 获取会话
                session = (ActiveMQSession) connection.createSession(Boolean.TRUE, session.AUTO_ACKNOWLEDGE);
                // 创建队列
                Destination queue = session.createQueue("blobQueue");
                // 创建生产者对象
                MessageProducer messageProducer = session.createProducer(queue);
                // 创建blob消息
                BlobMessage blobMessage = session.createBlobMessage(new File("pom.xml"));
                messageProducer.send(blobMessage);
                session.commit();
                session.close();
                connection.close();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
            }
    
        }
    }

    消费者代码:

    package cn.qlq.activemq.blob;
    
    import java.io.InputStream;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.BlobMessage;
    
    public class Consumer {
        /**
         * @param args
         * @throws JMSException
         */
        public static void main(String[] args) throws JMSException {
    
            // 获取 ConnectionFactory
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    
            // 创建 Connection
            Connection connection = connectionFactory.createConnection();
            connection.start();
    
            // 创建 Session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            // 创建 Destinatione
            Destination destination = session.createQueue("blobQueue");
    
            // 创建 Consumer
            MessageConsumer consumer = session.createConsumer(destination);
    
            //监听消息
            consumer.setMessageListener(new MessageListener() {
               @Override
               public void onMessage(Message message) {
                   if (message instanceof BlobMessage) {
                       //监听BlobMessage
                       BlobMessage blobMessage = (BlobMessage) message;
                       try {
                           InputStream in = blobMessage.getInputStream();
                           byte[] bytes = new byte[in.available()];
                           in.read(bytes);
                           System.out.println(new String(bytes));
                       } catch (Exception e) {
                           e.printStackTrace();
                       }
                   }
               }
           });
        }
    
    }

    4. Message Transformation 消息类型转换

      有时候需要JMS Producer内部进行message转换,从4.2版本起,ActiveMQ提供了一个Message Transform接口用于进行消息转换,也就是对消息的类型进行转换,可以在如下对象上调用:
        ActiveMQConnectionFactory,ActiveMQConnection,ActiveMQSession,ActiveMQMessageConsumer,ActiveMQMessageProducer.
      在消息被发送之前发送到JMS producer的消息总线前进行转换,通过producerTransform方法,在消息到达总线后,但是在consumer接收消息之前进行转换,通过consumerTransform方法,当然MessageTransfoemer接口的实现需要你自己来提供。

    例如:下面的一个在生产的时候转换的例子

    生产者:

    package cn.qlq.activemq.topic;
    
    import java.util.concurrent.CountDownLatch;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MapMessage;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.ActiveMQMessageProducer;
    import org.apache.activemq.MessageTransformer;
    import org.apache.activemq.ScheduledMessage;
    import org.apache.activemq.leveldb.replicated.dto.Transfer;
    
    /**
     * 主题模式的消息生产者
     * 
     * @author QiaoLiQiang
     * @time 2018年9月19日下午10:10:36
     */
    public class MsgProducer {
    
        // 默认端口61616
        private static final String url = "tcp://localhost:61616/";
        private static final String topicName = "transTopic";
    
        public static void main(String[] args) throws JMSException, InterruptedException {
            // 1创建ConnectionFactory
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
            // 2.由connectionFactory创建connection
            Connection connection = connectionFactory.createConnection();
    
            // 3.创建Session===第一个参数是是否事务管理,第二个参数是应答模式
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 4.创建Destination(Queue继承Queue,Topic也继承Destination==这三个都是接口)
            Destination destination = session.createTopic(topicName);
            // 5.创建生产者producer
            ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
            // 6设置为持久模式(这个必须在下面开启connection之前)
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            // 7.启动connection
            connection.start();
    
            // 8.设置消息转换类型
            producer.setTransformer(new MessageTransformer() {
                // 生产者实现这个方法
                @Override
                public Message producerTransform(Session session, MessageProducer producer, Message message)
                        throws JMSException {
                    if (message instanceof TextMessage) {
                        MapMessage mapMessage = session.createMapMessage();
                        mapMessage.setString("key", ((TextMessage) message).getText());
    
                        return mapMessage;
                    }
                    return message;
                }
    
                // 消费者换实现这个方法
                @Override
                public Message consumerTransform(Session session, MessageConsumer consumer, Message message)
                        throws JMSException {
                    return null;
                }
            });
    
            for (int i = 0; i < 3; i++) {
                // 9.创建Message,有好多类型,这里用最简单的TextMessage
                TextMessage tms = session.createTextMessage("textMessage:" + i);
                // 10.生产者发送消息
                producer.send(tms);
    
                System.out.println("send:" + tms.getText());
            }
            // 11.关闭connection
            connection.close();
    
        }
    
    }

    消费者:

    package cn.qlq.activemq.topic;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.MapMessage;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * 主题模式的消费消息
     * 
     * @author QiaoLiQiang
     * @time 2018年9月18日下午11:26:41
     */
    public class MsgConsumer {
    
        // 默认端口61616
        private static final String url = "tcp://localhost:61616/";
        private static final String topicName = "transTopic";
    
        public static void main(String[] args) throws JMSException {
            // 1创建ConnectionFactory
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
            // 2.由connectionFactory创建connection
            Connection connection = connectionFactory.createConnection();
            // 设置链接的ID
            // 3.创建Session===第一个参数是是否事务管理,第二个参数是应答模式
            Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 4.创建Destination(Queue继承Queue,Topic也继承Destination==这三个都是接口)
            Topic destination = session.createTopic(topicName);
            // 5.启动connection
            connection.start();
            // 6.创建消费者consumer
            MessageConsumer consumer = session.createConsumer(destination);
            // 监听消息
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if (message instanceof MapMessage) {
                        try {
                            MapMessage message2 = (MapMessage) message;
                            System.out.println(message2.getString("key"));
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    } else {
                        System.out.println(message.getClass());
                    }
                }
            });
    /*        session.close();
            connection.close();*/
        }
    
    }

    结果:

    WARN | path isn't a valid local location for TcpTransport to use
    textMessage:0
    textMessage:1
    textMessage:2

  • 相关阅读:
    mobileSelect学习
    使用qrcode生成二维码
    点点点右边有内容
    搜索框search
    input样式和修改
    art-template模板引擎高级使用
    Nodejs中的路径问题
    异步编程(回调函数,promise)
    在nodejs中操作数据库(MongoDB和MySQL为例)
    MongoDB数据库
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/10690285.html
Copyright © 2011-2022 走看看