zoukankan      html  css  js  c++  java
  • jms和activemq

    一、什么是JMS

      JMS是java message service的缩写即java消息服务,是java定义的消息中间件(MOM)的技术规范(类似玉JDBC)。用于程序之间的异步通信,如果两个应用程序需要通信,则可以通过JMS来进行转发,达到解耦的目的。

    二、JMS的消息模型

      JMS有两种消息模型:

        1、点对点或队列模型(Point-to-Point Messaging Domain):消息的生产者将生产出的消息加入到一个队列中,消息的接受者从队列中获取消息。队列保留着消息,直到他们被消费或者超时。

          特点:①每个消息都只有一个消费者,如果队列中的消息被某个消费者消费,该消息会移出队列。

             ②消息的生产者和消费者之间没有时间的约束,生产者可以生产消息,无论消费者是否在运行。同样只要消息队列中有消息,消费者就可以消费和生产者的状态无关。

             ③消费者接受完消息之后要向队列反馈成功的信息。

        2、发布订阅模型(Publish/Subscribe Messaging Domain ):发布者发布消息,所有的订阅者都会接受到消息。

          特点:①发布一个消息会有多个订阅者接受到消息

             ②消息发布者和订阅者之间有时间上的相关性。订阅一个主题的订阅者只能接收到自它订阅之后发布的消息。JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许订阅者接受它在未处于激活状态时发送的消息。

    三、JMS的对象模型

      1、连接工厂(ConnectionFactory):用于创建JMS连接

      2、JMS连接(Connection):表示JMS客户端和服务器端之间的一个活动的连接,是由客户端通过调用连接工厂的方法建立的。

      3、JMS会话(Session):表示JMS客户与JMS服务器之间的会话状态。JMS会话建立在JMS连接上,表示客户与服务器之间的一个会话线程。

      4、JMS目的地(Destination):生产者发送消息的地方。点对点模式中的目的地是queue,发布订阅模式中的目的地就是topic。

      5、生产者(Message Producer)和消费者(Message Consumer)对象由Session对象创建,用于发送和接收消息。

    四、ActiveMQ

      上面说到JMS是java消息服务的技术规范,它制定了在整个消息服务提供过程中的所有数据结构和交互流程。而MQ则是消息队列服务,是面向消息中间件(MOM)的最终实现,是真正的服务提供者;MQ的实现可以基于JMS,也可以基于其他规范或标准。

    ActiveMQ就是Apache组织的一个开源的消息中间件,它实现了JMS技术规范。

      1、ActiveMQ点对点模式

      下面的代码都是测试代码,消费者如果关闭连接,则只能消费一次消息,所以没有关闭。

    public class MsgSender {
    
        //ActiveMq 的默认用户名
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
        //ActiveMq 的默认登录密码
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
        //ActiveMQ 的链接地址
        private static final String BROKEN_URL = "tcp://127.0.0.1:61616";//ActiveMQConnection.DEFAULT_BROKER_URL;
        
        public static void main(String[] args) {
            
            //创建一个链接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            Connection connection = null;
            Session session = null;
            MessageProducer messageProducer = null;
            try {
                //从工厂中创建一个链接
                connection= connectionFactory.createConnection();
                //开启链接
                connection.start();
                //创建一个事务(这里通过参数可以设置事务的级别)
                session = connection.createSession(true,Session.SESSION_TRANSACTED);
                //创建一个消息队列
                Queue queue = session.createQueue("myqueue");
                //消息生产者
                messageProducer = session.createProducer(queue);
                int count = 0;
                while(true){
                    Thread.sleep(1000);
                    
                    //创建一条消息
                    TextMessage msg = session.createTextMessage("生产消息"+count);
                    System.out.println("生产消息"+count++);
                    //发送消息
                    messageProducer.send(msg);
                    //提交事务
                    session.commit();
                }
            } catch (JMSException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally{
                try {
                    if(connection != null){
                        connection.close();
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public class MsgReciver {
    
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    
        private static final String BROKEN_URL = "tcp://127.0.0.1:61616";//ActiveMQConnection.DEFAULT_BROKER_URL;
    
        public static void main(String[] args) {
            ConnectionFactory connectionFactory = null;
            Connection connection = null;
            Session session = null;
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            try {
                connection  = connectionFactory.createConnection();
                connection.start();
                session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
                Queue queue = session.createQueue("myqueue");
                MessageConsumer consumer = null;
                consumer = session.createConsumer(queue);
                
                MsgListener listener=new MsgListener();
                consumer.setMessageListener(listener);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    public class MsgListener implements MessageListener {
    
        @Override
        public void onMessage(Message msg) {
            TextMessage tmsg = (TextMessage)msg;
            try {
                System.out.println("接收到的数据:"+tmsg.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
    }

      2、发布订阅模式

    public class MsgSender {
    
        //ActiveMq 的默认用户名
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
        //ActiveMq 的默认登录密码
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
        //ActiveMQ 的链接地址
        private static final String BROKEN_URL = "tcp://127.0.0.1:61616";//ActiveMQConnection.DEFAULT_BROKER_URL;
        
        public static void main(String[] args) {
            
            //创建一个链接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            Connection connection = null;
            Session session = null;
            MessageProducer messageProducer = null;
            try {
                //从工厂中创建一个链接
                connection= connectionFactory.createConnection();
                //开启链接
                connection.start();
                //创建一个事务(这里通过参数可以设置事务的级别)
                session = connection.createSession(true,Session.SESSION_TRANSACTED);
                //创建一个Topic
                Topic myTopic = session.createTopic("mytopic");
                //消息生产者
                messageProducer = session.createProducer(myTopic);
                int count = 0;
                while(true){
                    Thread.sleep(10000);
                    //创建一条消息
                    TextMessage msg = session.createTextMessage("发布消息"+count);
                    System.out.println("发布消息"+count++);
                    //发送消息
                    messageProducer.send(msg);
                    //提交事务
                    session.commit();
                }
            } catch (JMSException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally{
                try {
                    if(connection != null){
                        connection.close();
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public class MsgReciver {
    
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    
        private static final String BROKEN_URL = "tcp://111.230.239.152:61616";//ActiveMQConnection.DEFAULT_BROKER_URL;
    
        public static void main(String[] args) {
            ConnectionFactory connectionFactory = null;
            Connection connection = null;
            Session session = null;
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            try {
                connection  = connectionFactory.createConnection();
                connection.start();
                session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
                Topic myTopic = session.createTopic("mytopic");
                MessageConsumer consumer = null;
                consumer = session.createConsumer(myTopic);
                MsgListener listener=new MsgListener();
                consumer.setMessageListener(listener);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    public class MsgListener implements MessageListener {
    
        @Override
        public void onMessage(Message msg) {
            TextMessage tmsg = (TextMessage)msg;
            try {
                System.out.println("接收到的数据:"+tmsg.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
    }

      

      3、spring集成ActiveMQ

       引入jar包

     <!-- 整合activemq -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.4</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>4.3.10.RELEASE</version>
        </dependency>
    activemq.brokerURL=tcp://127.0.0.1:61616
    
    activemq.queue=myqueue
    
    activemq.topic=mytopic
     <!-- 集成ActiveMQ -->
        <!-- 配置能够产生connection的connectionfactory,由JMS对应的服务厂商提供 -->
        <bean id="tagertConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <constructor-arg name="brokerURL" value="${activemq.brokerURL}"/>
        </bean>
        <!-- 配置spring管理真正connectionfactory的connectionfactory,相当于spring对connectionfactory的一层封装 -->
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="tagertConnectionFactory"/>
        </bean>
        <!-- 配置生产者 -->
        <!-- Spring使用JMS工具类,可以用来发送和接收消息 -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!-- 这里是配置的spring用来管理connectionfactory的connectionfactory -->
            <property name="connectionFactory" ref="connectionFactory"/>
        </bean>
        <!-- 配置destination -->
        <!-- 队列目的地 -->
        <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="${activemq.queue}"/>
        </bean>
        <!-- 话题目的地 -->
        <bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg value="${activemq.topic}"/>
        </bean>
        
         <!-- 配置监听器 -->
        <bean id="myQueueListener" class="com.myproject.listener.MqQueueListener"/>
        <bean id="mqTopicListener" class="com.myproject.listener.MqTopicListener"/>
        <!-- 系统监听器 -->
       <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"/>
            <property name="destination" ref="queueDestination"/>
            <property name="messageListener" ref="myQueueListener"/>
        </bean>
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"/>
            <property name="destination" ref="itemAddTopic"/>
            <property name="messageListener" ref="mqTopicListener"/>
        </bean>
    public class MqQueueListener implements MessageListener {
    
        @Override
        public void onMessage(Message message) {
            TextMessage testMessage = (TextMessage)message;
            try {
                System.out.println("收到信息:"+testMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    public class MqTopicListener implements MessageListener {
    
        @Override
        public void onMessage(Message message) {
            TextMessage testMessage = (TextMessage)message;
            try {
                System.out.println("收到信息:"+testMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
    }
    @Service
    public class UserServiceImpl implements UserService {
        
        @Autowired
        private UserMapper userMapper;
        @Autowired
        private RedisCacheUtil redisCacheUtil;
        
        @Autowired
        private JmsTemplate jmsTemplate;
        @Resource(name="itemAddTopic")
        private Destination destination;
        
        
        @Override
        public User getById(Map<String, Object> params) {
            Integer userId = Integer.parseInt(params.get("id").toString());
            User user = (User)redisCacheUtil.get(RedisKey.USER_BASIC_INFO+userId);
            if(user == null ){
                user = userMapper.selectByPrimaryKey(userId);
                if(user != null ){
                    redisCacheUtil.set(RedisKey.USER_BASIC_INFO+userId, user);
                }
            }
            ObjectMapper mapper = new ObjectMapper();
            try {
                final String userInfo = mapper.writeValueAsString(user);
                //获取到用户后发送一个消息给其他模块
                jmsTemplate.send(destination, new MessageCreator() {
                    
                    @Override
                    public Message createMessage(Session session) throws JMSException {
                        TextMessage msg = session.createTextMessage(userInfo);
                        return msg;
                    }
                });
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
            
            return user;
        }
    }

      

  • 相关阅读:
    execl csv导出
    input里面check 状态检测
    注意这种方法的判断
    本周,上周,本月,上月
    bootstrap 兼容IE8设置
    js jquery 验证写法
    Jquery radio checked
    最简单的XML转数组
    eq,neq,gt,lt等表达式缩写
    python中的注释
  • 原文地址:https://www.cnblogs.com/kyleinjava/p/9334699.html
Copyright © 2011-2022 走看看