zoukankan      html  css  js  c++  java
  • 【原创】JMS生产者和消费者【PTP同步接收消息】

    一般步骤:

    1. 请求一个JMS连接工i厂。
    2. 是用连接工厂创建连接。
    3. 启动JMS连接。
    4. 通过连接创建session。
    5. 获取一个目标。
    6. 创建一个生产者,或a.创建一个生产者,b.创建一条JMS消息并发送到目标
    7. 创建一个消费者,或a.创建一个消费者,b.注册一个消息监听器。
    8. 发送或接受消息。
    9. 关闭所有资源(连接,会话,生产者,消费者等)。

    首先登陆至ActiveMQ后台创建一个队列为TestQueue:

    ..省略

    创建生产者:

    package com.thunisoft.jms.mine;
    
    import java.util.HashMap;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.MessageProducer;
    import javax.jms.ObjectMessage;
    import javax.jms.Session;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * JMS生产者
     * 
     * @author zhangxsh
     * 
     */
    public class Producer {
    
        /**
         * @param args
         */
        public static void main(String[] args) {
            ConnectionFactory connectionFactory;
            Connection connection = null;
            Session session;
            Destination destination;
            MessageProducer producer;
            connectionFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnection.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
            try {
                // 通过连接工厂创建连接
                connection = connectionFactory.createConnection();
                // 启动连接
                connection.start();
                // 通过连接打开一个会话
                session = connection.createSession(Boolean.TRUE,
                        Session.AUTO_ACKNOWLEDGE);
                // 根据特定的队列名称创建一个目标地
                destination = session.createQueue("TestQueue");
                // 根据目标地创建一个生产者
                producer = session.createProducer(destination);
                // 不需要持久化的投递模式
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                for (int i = 1; i <= 10; i++) {
                    ObjectMessage message = session.createObjectMessage();
                    HashMap m = new HashMap();
                    m.put("key" + i, i);
                    message.setObject(m);
                    // 发送消息到目的地方
                    System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
                    producer.send(message);
                }
                session.commit();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    if (null != connection)
                        connection.close();
                } catch (Throwable ignore) {
                }
            }
        }
    }

    消费者:

    package com.thunisoft.jms.mine;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.MessageConsumer;
    import javax.jms.ObjectMessage;
    import javax.jms.Session;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * JMS消费者
     * 
     * @author zhangxsh
     * 
     */
    public class Consumer {
    
        /**
         * @param args
         */
        public static void main(String[] args) {
            ConnectionFactory connectionFactory;
            Connection connection = null;
            Session session;
            Destination destination;
            MessageConsumer consumer;
            connectionFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnection.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
            try {
                connection = connectionFactory.createConnection();
                connection.start();
                session = connection.createSession(Boolean.FALSE,
                        Session.AUTO_ACKNOWLEDGE);
                destination = session.createQueue("TestQueue");
                consumer = session.createConsumer(destination);
                while (true) {
                    ObjectMessage message = (ObjectMessage) consumer
                            .receive(100000);
                    if (null != message) {
                        System.out.println("收到消息" + message.getObject());
                    } else {
                        break;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    if (null != connection)
                        connection.close();
                } catch (Throwable ignore) {
                }
            }
    
        }
    
    }
  • 相关阅读:
    Ajax学习笔记2客户端访问WebService(上)
    在 .NET Framework 2.0 中未处理的异常导致基于 ASP.NET 的应用程序意外退出(我加了点注释)
    想用JS写一段鼠标拖拽调整图片大小的代码(未完)
    key/value/index 类型定义for .net
    ReadOnly属性对TEXTBOX的状态保存有影响?
    软件开发工具名称集锦(无下载地址,收集中...)
    AjaxPro让.NET的AjaxPro变得简单
    Ajax学习笔记(15):使用ASP.NET AJAX提供的Authentication Service
    在.NET中使用Newtonsoft.Json转换,读取,写入
    Orcale的 rownum
  • 原文地址:https://www.cnblogs.com/zhangxsh/p/3501767.html
Copyright © 2011-2022 走看看