zoukankan      html  css  js  c++  java
  • JavaEE(6)

    1. JMS消息的类型、消息头和消息属性

    消息类型:

    StreamMessage

    MapMessage

    TextMessage

    ObjectMessage

    BytesMessage

    JMS消息中的消息头和消息属性本质上都是一系列的key-value对,消息头的所有key都是标准的、固定的;消息属性的key和value都可以随意定义。

    // 为JMS消息设置一个自定义属性
    msg.setStringProperty("ConType", "txt");

    2. JMS消息的传递方式和确认方式

    传递方式:DeliveryMode.PERSISTENT, DeliveryMode.NON_PERSISTENT

    确认方式(JMS规范):Session.AUTO_ACKNOWLEDGE, Session.CLIENT_ACKNOWLEDGE, Session.DUPS_OK_ACKNOWLEDGE,

    WebLogic提供:Session.NO_ACKNOWLEDGE, Session.MULTICAST_NO_ACKNOWLEDGE,

    3. 使用消息选择器来过滤消息(NetBeans创建java project: MessageSelector)

    MessageSender.java

    package lee;
    
    import javax.jms.*;
    import javax.naming.*;
    import java.util.Properties;
    
    public class MessageSender {
    
        public void sendMessage() throws NamingException, JMSException {
            //定义WebLogic默认连接工厂的JNDI
            final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
            //获取JNDI服务所需的Context
            Context ctx = getInitialContext();
            //通过JNDI查找获取连接工厂
            ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
            //通过JNDI查找获取消息目的
            Destination dest = (Destination) ctx.lookup("MessageTopic");
            //连接工厂创建连接
            Connection conn = connFactory.createConnection();
            //JMS连接创建JMS会话
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //JMS会话创建消息生产者
            MessageProducer sender = session.createProducer(dest);
            //设置消息生产者生产出来的消息的传递模式、有效时间。
            sender.setDeliveryMode(DeliveryMode.PERSISTENT);
            sender.setTimeToLive(20000);
            //通过JMS会话创建一个文本消息
            TextMessage msg = session.createTextMessage();
        //msg.setStringProperty("ConType","txt");
            //设置消息内容
            msg.setText("Hello");
            //发送消息,指定该消息的优先级为8
            sender.send(msg, DeliveryMode.PERSISTENT, 8, 30000);
            //为第二条消息设置属性
            msg.setStringProperty("ConType", "TXT");
            msg.setText("Welcome to JMS");
            //再次发送消息
            sender.send(msg);
            //关闭资源
            session.close();
            conn.close();
        }
    
        //工具方法,用来获取命名服务的Context对象
        private Context getInitialContext() {
            // 参看(4)
        }
    
        public static void main(String[] args) throws Exception {
            MessageSender sender = new MessageSender();
            sender.sendMessage();
        }
    }

    PriorityConsumer.java

    package lee;
    
    import javax.jms.*;
    import javax.naming.*;
    import java.util.Properties;
    
    public class PriorityConsumer {
    
        public void receiveMessage() throws JMSException, NamingException {
            //定义WebLogic默认连接工厂的JNDI
            final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
            //获取JNDI服务所需的Context
            Context ctx = getInitialContext();
            //通过JNDI查找获取连接工厂
            ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
            //通过JNDI查找获取消息目的    
            Destination dest = (Destination) ctx.lookup("MessageTopic");
            //连接工厂创建连接
            Connection conn = connFactory.createConnection();
            //启动JMS连接,让它开始传输JMS消息
            conn.start();
            //JMS连接创建JMS会话
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //JMS会话创建消息消费者,指定该消息消费者只对优先级大于5的消息感兴趣。
            MessageConsumer receiver = session.createConsumer(dest, "JMSPriority > 5");
            //同步接收消息,如果没有接收到消息,该方法会阻塞线程
            TextMessage msg = (TextMessage) receiver.receive();
            System.out.println(msg);
            System.out.println("同步接收到的消息:" + msg.getText());
            //关闭资源
            session.close();
            conn.close();
        }
    
        //工具方法,用来获取命名服务的Context对象
        private Context getInitialContext() {
            // 参看(4)
        }
    
        public static void main(String[] args) throws Exception {
            PriorityConsumer consumer = new PriorityConsumer();
            consumer.receiveMessage();
        }
    }

    PropertyConsumer.java

    package lee;
    
    import javax.jms.*;
    import javax.naming.*;
    import java.util.Properties;
    
    public class PropertyConsumer {
    
        public void receiveMessage() throws JMSException, NamingException {
            //定义WebLogic默认连接工厂的JNDI
            final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
            //获取JNDI服务所需的Context
            Context ctx = getInitialContext();
            //通过JNDI查找获取连接工厂
            ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
            //通过JNDI查找获取消息目的    
            Destination dest = (Destination) ctx.lookup("MessageTopic");
            //连接工厂创建连接
            Connection conn = connFactory.createConnection();
            //启动JMS连接,让它开始传输JMS消息
            conn.start();
            //JMS连接创建JMS会话
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //JMS会话创建消息消费者,指定该消息消费者只对ConType属性为TXT的消息感兴趣。
            MessageConsumer receiver = session.createConsumer(dest, "ConType='TXT'");
            //同步接收消息,如果没有接收到消息,该方法会阻塞线程
            TextMessage msg = (TextMessage) receiver.receive();
            System.out.println(msg);
            System.out.println("同步接收到的消息:" + msg.getText());
            //关闭资源
            session.close();
            conn.close();
        }
    
        //工具方法,用来获取命名服务的Context对象
        private Context getInitialContext() {
            // 参看(4)
        }
    
        public static void main(String[] args) throws Exception {
            PropertyConsumer consumer = new PropertyConsumer();
            consumer.receiveMessage();
        }
    }

    4. 消息的临时目的地(NetBeans创建java project: TempDestination)

    MessageSender.java

    package lee;
    
    import javax.jms.*;
    import javax.naming.*;
    import java.util.*;
    
    public class MessageSender {
    
        static final String HAD_RECEIVED = "OK";
    
        public void sendMessage() throws NamingException, JMSException, InterruptedException {
            //定义WebLogic默认连接工厂的JNDI
            final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
            //获取JNDI服务所需的Context
            Context ctx = getInitialContext();
            //通过JNDI查找获取连接工厂
            ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
            //通过JNDI查找获取消息目的
            Destination dest = (Destination) ctx.lookup("MessageQueue");
            //连接工厂创建连接
            Connection conn = connFactory.createConnection();
            //JMS连接创建JMS会话
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //JMS会话创建消息生产者
            MessageProducer sender = session.createProducer(dest);
            //设置消息生产者生产出来的消息的传递模式、有效时间。
            sender.setDeliveryMode(DeliveryMode.PERSISTENT);
            sender.setTimeToLive(20000);
            //通过JMS会话创建一个文本消息
            TextMessage msg = session.createTextMessage();
            //设置消息内容
            msg.setText("Hello,是否受到消息,收到请回复!");
            //创建一个临时的消息目的
            TemporaryQueue tempDestination = session.createTemporaryQueue();
            System.out.println(tempDestination);
            //修改消息的消息头
            msg.setJMSReplyTo(tempDestination);
            //发送消息
            sender.send(msg);
            Thread.sleep(5000);
            //以JMS会话根据临时消息队列来创建队列浏览者
            QueueBrowser browser = session.createBrowser((javax.jms.Queue) tempDestination);
            //获取消息队列中所有消息
            Enumeration enumer = browser.getEnumeration();
            //列出消息队列中的所有消息
            while (enumer.hasMoreElements()) {
                TextMessage acMsg = (TextMessage) enumer.nextElement();
                if (acMsg.getText().equals(HAD_RECEIVED)) {
                    System.out.println("对方已经收到消息!");
                    break;
                }
            }
            //临时目的调用delete方法删除自己,释放资源。
            tempDestination.delete();
            //关闭资源
            session.close();
            conn.close();
        }
    
        //工具方法,用来获取命名服务的Context对象
        private Context getInitialContext() {
            // 参看(4)
        }
    
        public static void main(String[] args) throws Exception {
            MessageSender sender = new MessageSender();
            sender.sendMessage();
        }
    }

    MessageReceiver.java

    package lee;
    
    import javax.jms.*;
    import javax.naming.*;
    import java.util.*;
    
    public class MessageReceiver {
    
        static final String HAD_RECEIVED = "OK";
    
        public void receiveMessage() throws JMSException, NamingException {
            //定义WebLogic默认连接工厂的JNDI
            final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
            //获取JNDI服务所需的Context
            Context ctx = getInitialContext();
            //通过JNDI查找获取连接工厂
            ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
            //通过JNDI查找获取消息目的    
            Destination dest = (Destination) ctx.lookup("MessageQueue");
            //连接工厂创建连接
            Connection conn = connFactory.createConnection();
            //启动JMS连接,让它开始传输JMS消息
            conn.start();
            //JMS连接创建JMS会话
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //JMS会话创建消息消费者,指定该消息消费者只对优先级大于5的消息感兴趣。
            MessageConsumer receiver = session.createConsumer(dest);
            //同步接收消息,如果没有接收到消息,该方法会阻塞线程
            TextMessage msg = (TextMessage) receiver.receive();
            System.out.println(msg);
            System.out.println("同步接收到的消息:" + msg.getText());
            //获取指定消息的replyTo消息头的信息
            Destination replyTo = msg.getJMSReplyTo();
            System.out.println(replyTo);
            //创建发送到回复目的的消息生产者
            MessageProducer sender = session.createProducer(replyTo);
            TextMessage replyMsg = session.createTextMessage();
            replyMsg.setText(HAD_RECEIVED);
            sender.send(replyMsg);
            //关闭资源
            session.close();
            conn.close();
        }
    
        //工具方法,用来获取命名服务的Context对象
        private Context getInitialContext() {
            // 参看(4)
        }
    
        public static void main(String[] args) throws Exception {
            MessageReceiver receiver = new MessageReceiver();
            receiver.receiveMessage();
        }
    }

    5. 使用队列浏览器来查看消息(NetBeans创建java project: MessageBrowser)

    MessageSender.java

    package lee;
    
    import javax.jms.*;
    import javax.naming.*;
    import java.util.Properties;
    
    public class MessageSender {
    
        public void sendMessage() throws NamingException, JMSException {
            //定义WebLogic默认连接工厂的JNDI
            final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
            //获取JNDI服务所需的Context
            Context ctx = getInitialContext();
            //通过JNDI查找获取连接工厂
            ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
            //通过JNDI查找获取消息目的
            Destination dest = (Destination) ctx.lookup("MessageQueue");
            //连接工厂创建连接
            Connection conn = connFactory.createConnection();
            //JMS连接创建JMS会话
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //JMS会话创建消息生产者
            MessageProducer sender = session.createProducer(dest);
            //设置消息生产者生产出来的消息的传递模式、有效时间。
            sender.setDeliveryMode(DeliveryMode.PERSISTENT);
            sender.setTimeToLive(20000);
            //通过JMS会话创建一个文本消息
            TextMessage msg = session.createTextMessage();
            //msg.setStringProperty("ConType","txt");
            //设置消息内容
            msg.setText("Hello");
            //发送消息
            sender.send(msg);
            msg.setText("Welcome to JMS");
            //再次发送消息
            sender.send(msg);
            //关闭资源
            session.close();
            conn.close();
        }
    
        //工具方法,用来获取命名服务的Context对象
        private Context getInitialContext() {
            // 参看(4)
        }
    
        public static void main(String[] args) throws Exception {
            MessageSender sender = new MessageSender();
            sender.sendMessage();
        }
    }

    MessageBrowser.java

    package lee;
    
    import javax.jms.*;
    import javax.naming.*;
    import java.util.*;
    
    public class MessageBrowser {
    
        public void browseMessage() throws JMSException, NamingException {
            //定义WebLogic默认连接工厂的JNDI
            final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
            //获取JNDI服务所需的Context
            Context ctx = getInitialContext();
            //通过JNDI查找获取连接工厂
            ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
            //通过JNDI查找获取消息目的    
            Destination dest = (Destination) ctx.lookup("MessageQueue");
            //连接工厂创建连接
            Connection conn = connFactory.createConnection();
            //启动JMS连接,让它开始传输JMS消息
            conn.start();
            //JMS连接创建JMS会话
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //以JMS会话根据消息队列来创建队列浏览者
            QueueBrowser browser = session.createBrowser((javax.jms.Queue) dest);
            //获取消息队列中所有消息
            Enumeration em = browser.getEnumeration();
            //列出消息队列中的所有消息
            while (em.hasMoreElements()) {
                TextMessage msg = (TextMessage) em.nextElement();
                System.out.println(msg.getText());
            }
            session.close();
            conn.close();
        }
    
        //工具方法,用来获取命名服务的Context对象
        private Context getInitialContext() {
            // 参看(4)
        }
    
        public static void main(String[] args) throws Exception {
            MessageBrowser browser = new MessageBrowser();
            browser.browseMessage();
        }
    }
  • 相关阅读:
    ETCD 添加节点报错 tocommit(2314438) is out of range [lastIndex(0)]. Was the raft log corrupted, truncated, or lost?
    如何删除docker镜像中已配置的volume
    kubeadm初始化集群
    kubeadm安装依赖镜像
    standard_init_linux.go:178: exec user process caused "no such file or directory"
    kubernetes支持local volume
    git http方式时保存密码
    分布式核心技术
    docker使用
    Python实用日期时间处理方法汇总
  • 原文地址:https://www.cnblogs.com/thlzhf/p/4249149.html
Copyright © 2011-2022 走看看