zoukankan      html  css  js  c++  java
  • JavaEE(5)

    1. 在Weblogic服务器上配置Pub-Sub消息目的

    向已有的JMS模块中添加消息主题:

    Services-->Messaging-->JMS Modules--><Module Name>-->Configuration-->New-->Topic(Name: MessageTopic)

    2. 可靠的JMS订阅(NetBeans创建java project: DurablePubSub)

    #1. 编写Pub-Sub消息的生产者(MessageSender.java)

    package lee;
    
    import javax.jms.*;
    import javax.naming.*;
    import java.util.Properties;
    
    public class MessageSender {
    
        public void sendMessage() throws NamingException, JMSException {
            //定义WebLogic默认连接工厂的JNDI
            final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
            //获取JNDI服务所需的Context
            Context ctx = getInitialContext();
            //通过JNDI查找获取连接工厂
            ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
            //通过JNDI查找获取消息目的
            Destination dest = (Destination) ctx.lookup("MessageTopic");
            //连接工厂创建连接
            Connection conn = connFactory.createConnection();
            //JMS连接创建JMS会话
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //JMS会话创建消息生产者
            MessageProducer sender = session.createProducer(dest);
            //设置消息生产者生产出来的消息的传递模式、有效时间。
            sender.setDeliveryMode(DeliveryMode.PERSISTENT);
            sender.setTimeToLive(20000);
            //通过JMS会话创建一个文本消息
            TextMessage msg = session.createTextMessage();
            //msg.setStringProperty("ConType","txt");
            //设置消息内容
            msg.setText("Hello");
            //发送消息
            sender.send(msg);
            msg.setText("Welcome to JMS");
            //再次发送消息
            sender.send(msg);
            //关闭资源
            session.close();
            conn.close();
        }
    
        //工具方法,用来获取命名服务的Context对象
        private Context getInitialContext() {
            // 参看(4)
        }
    
        public static void main(String[] args) throws Exception {
            MessageSender sender = new MessageSender();
            sender.sendMessage();
        }
    }

    #2. 编写Pub-Sub消息的同步接收者(SyncConsumer.java)

    package lee;
    
    import javax.jms.*;
    import javax.naming.*;
    import java.util.Properties;
    
    public class SyncConsumer {
    
        public void receiveMessage() throws JMSException, NamingException {
            //定义WebLogic默认连接工厂的JNDI
            final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
            //获取JNDI服务所需的Context
            Context ctx = getInitialContext();
            //通过JNDI查找获取连接工厂
            ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
            //通过JNDI查找获取消息目的    
            Topic dest = (Topic) ctx.lookup("MessageTopic");
            //连接工厂创建连接
            Connection conn = connFactory.createConnection();
            //将客户端ID设为crazyit.org
            conn.setClientID("crazyit.org");
            //启动JMS连接,让它开始传输JMS消息
            conn.start();
            //JMS连接创建JMS会话
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建可靠的消息订阅者
            MessageConsumer receiver = session.createDurableSubscriber(dest, "crazyit.org");
            //同步接收消息,如果没有接收到消息,该方法会阻塞线程
            TextMessage msg = (TextMessage) receiver.receiveNoWait();
            System.out.println(msg);
            if (msg != null) {
                System.out.println("同步接收到的消息:" + msg.getText());
            }
            //关闭资源
            session.close();
            conn.close();
        }
    
        //工具方法,用来获取命名服务的Context对象
        private Context getInitialContext() {
            // 参看(4)
        }
    
        public static void main(String[] args) throws Exception {
            SyncConsumer sender = new SyncConsumer();
            sender.receiveMessage();
        }
    }

    #3. 编写Pub-Sub消息的异步接收者(AsyncConsumer.java)

    package lee;
    
    import javax.jms.*;
    import javax.naming.*;
    import java.util.Properties;
    
    //JMS异步消费者就是一个监听器,故实现MessageListener接口
    public class AsyncConsumer implements MessageListener {
    
        public AsyncConsumer() throws NamingException, JMSException, InterruptedException {
            //定义WebLogic默认连接工厂的JNDI
            final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
            //获取JNDI服务所需的Context
            Context ctx = getInitialContext();
            //通过JNDI查找获取连接工厂
            ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
            //通过JNDI查找获取消息目的
            Topic dest = (Topic) ctx.lookup("MessageTopic");
            //连接工厂创建连接
            Connection conn = connFactory.createConnection();
            //将客户端ID设为crazyit.org
            conn.setClientID("leegang.org");
            //启动JMS连接,让它开始传输JMS消息
            conn.start();
            //JMS连接创建JMS会话    
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建可靠的消息订阅者
            MessageConsumer receiver = session.createDurableSubscriber(dest, "leegang.org");
            //为JMS消息消费者绑定消息监听器
            receiver.setMessageListener(this);
            //程序暂停20s,在此期间内以异步方式接收消息
            Thread.sleep(20000);
            //关闭资源
            session.close();
            conn.close();
        }
    
        //实现消息监听器必须实现的方法。
    
        public void onMessage(Message m) {
            TextMessage msg = (TextMessage) m;
            System.out.println(msg);
            try {
                System.out.println("异步接收的消息:" + msg.getText());
            } 
            catch (JMSException ex) {
                ex.printStackTrace();
            }
        }
    
        //工具方法,用来获取命名服务的Context对象
    
        private Context getInitialContext() {
            // 参看(4)
        }
    
        public static void main(String[] args) throws Exception {
            AsyncConsumer consumer = new AsyncConsumer();
        }
    }

    3. 不可靠的JMS订阅(NetBeans创建java project: JmsPubSub)

    #1. 编写Pub-Sub消息的生产者(MessageSender.java)

    package lee;
    
    import javax.jms.*;
    import javax.naming.*;
    import java.util.Properties;
    
    public class MessageSender {
    
        public void sendMessage() throws NamingException, JMSException {
            final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
            Context ctx = getInitialContext();
            
            ConnectionFactory connFactory = (ConnectionFactory)ctx.lookup(CONNECTION_FACTORY_JNDI);
            Destination dest = (Destination)ctx.lookup("MessageTopic");
            
            Connection conn = connFactory.createConnection();
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            MessageProducer sender = session.createProducer(dest);
            sender.setDeliveryMode(DeliveryMode.PERSISTENT);
            sender.setTimeToLive(20000);
            
            TextMessage msg = session.createTextMessage();
            
            msg.setText("Hello");
            sender.send(msg);
            
            msg.setText("Welcome to JMS");
            sender.send(msg);
    
            session.close();
            conn.close();
        }
    
        private Context getInitialContext() {
            // 参看(4)
        }
        
        public static void main(String[] args) throws Exception {
            MessageSender sender = new MessageSender();
            sender.sendMessage();
        }
    }

    #2. 编写Pub-Sub消息的同步接收者(SyncConsumer.java)

    package lee;
    
    import javax.jms.*;
    import javax.naming.*;
    import java.util.Properties;
    
    public class SyncConsumer {
    
        public void receiveMessage() throws JMSException, NamingException {
            //定义WebLogic默认连接工厂的JNDI
            final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
            //获取JNDI服务所需的Context
            Context ctx = getInitialContext();
            //通过JNDI查找获取连接工厂
            ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
            //通过JNDI查找获取消息目的    
            Destination dest = (Destination) ctx.lookup("MessageTopic");
            //连接工厂创建连接
            Connection conn = connFactory.createConnection();
            //启动JMS连接,让它开始传输JMS消息
            conn.start();
            //JMS连接创建JMS会话
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //JMS会话创建消息消费者
            MessageConsumer receiver = session.createConsumer(dest);
            //同步接收消息,如果没有接收到消息,该方法会阻塞线程
            TextMessage msg = (TextMessage) receiver.receive();
            System.out.println(msg);
            System.out.println("同步接收到的消息:" + msg.getText());
            //关闭资源
            session.close();
            conn.close();
        }
    
        //工具方法,用来获取命名服务的Context对象
        private Context getInitialContext() {
            // 参看(4)
        }
    
        public static void main(String[] args) throws Exception {
            SyncConsumer consumer = new SyncConsumer();
            consumer.receiveMessage();
        }
    }

    #3. 编写Pub-Sub消息的异步接收者(AsyncConsumer.java)

    package lee;
    
    import javax.jms.*;
    import javax.naming.*;
    import java.util.Properties;
    
    //JMS异步消费者就是一个监听器,故实现MessageListener接口
    public class AsyncConsumer implements MessageListener {
    
        public AsyncConsumer() throws NamingException, JMSException, InterruptedException {
            //定义WebLogic默认连接工厂的JNDI
            final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
            //获取JNDI服务所需的Context
            Context ctx = getInitialContext();
            //通过JNDI查找获取连接工厂
            ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
            //通过JNDI查找获取消息目的
            Destination dest = (Destination) ctx.lookup("MessageTopic");
            //连接工厂创建连接
            Connection conn = connFactory.createConnection();
            //启动JMS连接,让它开始传输JMS消息
            conn.start();
            //JMS连接创建JMS会话    
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //JMS会话创建消息消费者
            MessageConsumer receiver = session.createConsumer(dest);
            //为JMS消息消费者绑定消息监听器
            receiver.setMessageListener(this);
            //程序暂停20s,在此期间内以异步方式接收消息
            Thread.sleep(20000);
            //关闭资源
            session.close();
            conn.close();
        }
    
        //实现消息监听器必须实现的方法。
        public void onMessage(Message m) {
            TextMessage msg = (TextMessage) m;
            System.out.println(msg);
            try {
                System.out.println("异步接收的消息:" + msg.getText());
            } catch (JMSException ex) {
                ex.printStackTrace();
            }
        }
    
        //工具方法,用来获取命名服务的Context对象
        private Context getInitialContext() {
            // 参看(4)
        }
    
        public static void main(String[] args) throws Exception {
            AsyncConsumer consumer = new AsyncConsumer();
        }
    }
  • 相关阅读:
    LeetCode 动态规划专题
    LeetCode 双指针、滑动窗口、单调队列专题
    LeetCode 单调栈专题
    LeetCode DFS搜索与回溯专题
    LeetCode树专题
    操作系统思维导图| IO管理篇
    2020-为什么换了工作
    深入浅出Spring MVC
    rocketmq初识
    线上故障处理手册
  • 原文地址:https://www.cnblogs.com/thlzhf/p/4249143.html
Copyright © 2011-2022 走看看