zoukankan      html  css  js  c++  java
  • jms入门

    一.所需jar(maven)

    <dependency>
       <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-all</artifactId>
       <version>5.14.3</version>
    </dependency>

    二.创建生产者

    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.command.ActiveMQQueue;
    import org.apache.activemq.command.ActiveMQTopic;
    
    import javax.jms.*;
    import java.io.Serializable;
    
    public class JMSProducer {
        public static void main(String[] args) throws JMSException {
            ConnectionFactory connectionfactory =new ActiveMQConnectionFactory("username","pwd","tcp://localhost:61616");
    
            //创建与JMS服务的连接:ConnectionFactory被管理的对象,由客户端创建,用来创建一个连接对象
            Connection connection = connectionfactory.createConnection();
            /*
                    确认消息的方式有如下三种:
                    AUTO_ACKNOWLEDGE(自动通知)
                    CLIENT_ACKNOWLEDGE(客户端自行决定通知时机)
                    DUPS_OK_ACKNOWLEDGE(延时//批量通知)
    
             */
             
            /*
                打开会话,一个单独的发送和接受消息的线程上下文 
                为true时,事务会话必须session.commit();
            */
            Session session =connection.createSession(true,Session.AUTO_ACKNOWLEDGE );
            JMSProducer qs = new JMSProducer();
            qs.sendTextMsg(session,"helli text","jmsText");
            
            /*
                MapMessage mapMsg = session.createMapMessage();
                mapMsg.setString("name", "张三");
                mapMsg.setInt("age", 35);
                qs.sendMap(session, mapMsg, "queue.msgMap");//发送map类型的消息
    
                JMS jms = new JMS();//发送Object类型消息
                jms.setName("zhangsan");
                jms.setSex("男");
                qs.sendObj(session,jms,"queue.msgObj");
            */
              
            session.commit(); //在事务性会话中,只有commit之后,消息才会真正到达目的地
            session.close();
            connection.close();
        }
    
        /*
           发送文本消息
         */
        public void sendTextMsg(Session session,String MsgContent,String name) throws JMSException{
                Queue queue = new ActiveMQQueue(name); // Topic topic=new ActiveMQTopic(name); 创建topic
                MessageProducer msgProducer = session.createProducer(queue);
                Message textMessage = session.createTextMessage(MsgContent);
                msgProducer.send(textMessage);
    
            /*    
                //发送byte字节
                byte[] bs={1,2};
                BytesMessage  msg1= session.createBytesMessage();
                msg1.writeBytes(bs);
                msgProducer.send(msg1);
    
                //流消息
                StreamMessage streamMessage = session.createStreamMessage();
                streamMessage.writeString("streamMessage流消息");
                streamMessage.writeLong(55);
                producer.send(streamMessage);
             */
        }
        
        /*
           发送MAP类型消息
         */
        public void sendMap(Session session, MapMessage map, String name) throws JMSException {
            Topic topic=new ActiveMQTopic(name);   // Queue queue = new ActiveMQQueue(name);
            MessageProducer msgProducer1=session.createProducer(topic);
            msgProducer1.send(map);
            //msgProducer1.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 设置了重启之后消息会丢失
            //msgProducer1.setTimeToLive(1000*60*60);  消息有效期1小时
        }
        
        /*
           发送Object类型消息
         */
        public void sendObj(Session session,Object obj,String name) throws JMSException{
            Destination queue = new ActiveMQQueue(name);
            //发送对象时必须让该对象实现serializable接口
            ObjectMessage objMsg=session.createObjectMessage((Serializable) obj);
            MessageProducer msgProducer = session.createProducer(queue);
            msgProducer.send(objMsg);
        }
    }

    三.创建消费者

    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.command.ActiveMQQueue;
    import org.apache.activemq.command.ActiveMQTopic;
    
    import javax.jms.*;
    
    public class JMSConsumer implements MessageListener{
        public static void main(String[] args) throws JMSException {
            ConnectionFactory connectionfactory =null;
            Connection connection=null;
            Session session=null;
            if(connectionfactory==null){
                connectionfactory = new ActiveMQConnectionFactory("username","pwd","tcp://localhost:61616");
                //接收对象时,设置这个为true
                ((ActiveMQConnectionFactory) connectionfactory).setTrustAllPackages(true);
            }
            if(connection==null){
                connection = connectionfactory.createConnection();
                connection.start();
            }
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            Queue queue = new ActiveMQQueue("que");//根据发送的名称接受消息
            MessageConsumer consumer = session.createConsumer(queue);
            consumer.setMessageListener(new JMSConsumer());//不继承MessageListener时可以用consumer.receive()手动接受消息
    
            Topic queue1 = new ActiveMQTopic("topic-name");
            MessageConsumer consumer1 = session.createConsumer(queue1);
            consumer1.setMessageListener(new JMSConsumer());
    
            Queue queue3 = new ActiveMQQueue("queue.msgMap");
            MessageConsumer consumer3 = session.createConsumer(queue3);
            consumer3.setMessageListener(new JMSConsumer());
    
            Queue queue2 = new ActiveMQQueue("queue.msgObj");
            MessageConsumer consumer2 = session.createConsumer(queue2);
            consumer2.setMessageListener(new JMSConsumer());
        }
    
        public void onMessage(Message message) {
            //instanceof 测试它所指向的对象是否是TextMessage类
            if(message instanceof TextMessage){ //接受文本消息
                TextMessage text = (TextMessage) message;
                try {
                    System.out.println("message:"+message);
                    System.out.println("发送的文本消息内容为:"+text.getText()); 
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
           
            if(message instanceof MapMessage){ //接收map消息
                MapMessage map = (MapMessage) message;
                try {
                    System.out.println("姓名:"+map.getString("name"));
                    System.out.println("年龄:"+map.getInt("age"));
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            
            if(message instanceof ObjectMessage){ //接收object
                try {
                    System.out.println("ObjectMessage");
                    ObjectMessage objMsg =(ObjectMessage) message;
                    JMS jms=(JMS) objMsg.getObject();
                    System.out.println(jms);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
            if(message instanceof BytesMessage){ //接收字节消息
                byte[] b = new byte[1024];
                int len = -1;
                BytesMessage byteMsg = (BytesMessage)message;
                try {
                    while((len=byteMsg.readBytes(b))!=-1){
                        System.out.println(new String(b, 0, len));
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
                /*
                      if(message instanceof StreamMessage){ //接收流消息
                        StreamMessage message = (StreamMessage)message;
                        System.out.println(message.readString());
                        System.out.println(message.readLong());
                    }
                 */
    
        }
    }
  • 相关阅读:
    奇怪的人
    假象世界
    心态记录
    民用自组织网络公司概要
    禁止VMware虚拟机与Host的时间同步
    20万左右SUV介绍
    手机GPS为什么能在室内定位?
    取余与位运算
    shell 基础进阶 *金字塔
    shell 、awk两种方法编写9*9法表
  • 原文地址:https://www.cnblogs.com/bkyliufeng/p/7219726.html
Copyright © 2011-2022 走看看