zoukankan      html  css  js  c++  java
  • ActiveMQ 详解

    1. 如何同步索引库

    • 方案一: 在taotao-manager中,添加商品的业务逻辑中,添加一个同步索引库的业务逻辑;
      • 缺点:业务逻辑耦合度高,业务拆分不明确;
    • 方案二: 业务逻辑在taotato-search中实现,调用服务在taotao-manager实现,业务逻辑分开
      • 缺点:服务之间的耦合度变高,服务的启动有先后顺序;
    • 方案三: 使用消息队列,MQ是一个消息中间件,包括:ActiveMQ,RabbitMQ,kafka等;

    2. ActiveMQ 的消息形式

    2.1 对于消息的传递有两种类型:

    • 一种是点对点,即一个生产者和一个消费者一一对应;
    • 另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收;

    2.2 JMS 定义了五种不同的消息正文格式

    • StreamMessage: Java原始值的数据流;
    • MapMessage: 一套名称-值对
    • TestMessage:一个字符串对象
    • ObjectMessage:一个序列化的Java对象
    • BytesMessage:一个字节的数据流

    3. ActiveMQ 的使用方法

    3.1 Queue 和 Topic

    // 测试类
    public class TestActiveMq{
    
        //Queue
        //Producer(生产者)
        @Test
        public void testQueueProducer() throws Exception{
    
            // 1.创建一个连接工厂对象ConnectionFactory对象,需要指定mq服务的ip及端口
            ConnectionFactory connectionFactory =
                                        new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            // 2.使用ConnectionFactory,创建一个连接Connection对象
            Connection connection = connectionFactory.createConnection();
            // 3.开启连接,调用Connection对象的start方法
            connection.start();
            // 4.使用Connection对象,创建一个Session对象
            // 第一个参数:表示是否开启事务,一般不使用事务;为了保证数据的最终一致,可以使用消息队列实现
            // 如果第一个参数为true,第二个参数自动忽略;
            // 如果不开启事务,第二个参数为消息的应答模式:包括自动应答和手动应答;一般是自动应答
            Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
            // 5. 使用Session对象,创建一个Destination对象,有两种形式: Queue,topic
            // 参数表示:消息队列的名称
            Queue queue = session.createQueue("test-queue");
            // 6. 使用 Session 对象,创建一个Producer对象
            MessageProducer producer = session.createProducer(queue);
            // 7. 创建一个TextMessage对象
            // TextMessage textMessage = new ActiveMQTextMessage();
            // textMessage.setText("hello activemq");
            TextMessage textMessage = session.createTextMessage("hello activemq");
            // 8. 发送消息
            producer.send(textMessage);
            // 9. 关闭资源
            producer.close();
            session.close();
            connection.close();
        }
    
        @Test
        public void testQueueConsumer() throws Exception{
            // 创建一个连接工厂对象
            ConnectionFactory connectionFactory =
                                        new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            // 使用连接工厂对象,创建一个连接
            Connection connection = connectionFactory.createConnection();
            // 开启连接
            connection.start();
            // 使用连接对象,创建一个Session对象
            Session session = connection.createSesion(false,Session.AUTO_ACKNOWLEDGE);
            // 使用Session,创建一个Destination,Destination 应该和消息的发送端一致
            Queue queue = session.createQueue("test-queue");
            // 使用Session,创建一个Consumer对象
            MessageConsumer consumer = session.createConsumer(queue);
            // 向Consumer对象中,设置一个MessageListener对象,用来接收消息
            consumer.setMessageListener(new MessageListener(){
                public void onMessage(Message message){
                    // 获取消息的内容
                    if(message instanceof TextMessage){
                        TextMessage textMessage = (TextMessage)message;
                        try{
                            String text = textMessage.getText();
                            // 打印消息内容
                            System.out.println(text);
                        }catch(JMSException e){
                            e.printStackTrace();
                        }
                    }
                }
            });
            // 系统等待接收消息
            // 第一种方式:
                /*
                 * while(true){
                 *       Thread.sleep(100);
                 * }
                 */
            // 第二种方式:
            System.in.read();
            // 关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    
        // Topic
        // Producer(生产者)
        @Test
        public void testTopicProducer() throws Exception{
            // 创建一个连接工厂对象
            ConnectionFactory connectionFactory =
                                        new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            // 创建连接
            Connection connection = connectionFactory.createConnection();
            // 开启连接
            connection.start();
            // 创建Session
            Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
            // 创建Destination,使用topic
            Topic topic = session.createTopic("test-topic");
            // 创建一个Producer对象
            MessageProducer producer = session.createProducer(topic);
            // 创建一个TextMessage对象
            TextMessage textMessage = session.createTextMessage("hello activemq topic");
            // 发送消息
            producer.send(textMessage);
            // 关闭资源
            producer.close();
            session.close();
            connection.close();
        }
    
        @Test
        public void testTopicConsumer() throws Exception{
            // 创建一个连接工厂对象
            ConnectionFactory connectionFactory =
                                        new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            // 使用连接工厂对象,创建一个连接
            Connection connection = connectionFactory.createConnection();
            // 开启连接
            connection.start();
            // 使用连接对象,创建一个Session对象
            Session session = connection.createSesion(false,Session.AUTO_ACKNOWLEDGE);
            // 使用Session,创建一个Destination,Destination 应该和消息的发送端一致
            Topic topic = session.createTopic("test-topic");
            // 使用Session,创建一个Consumer对象
            MessageConsumer consumer = session.createConsumer(topic);
            // 向Consumer对象中,设置一个MessageListener对象,用来接收消息
            consumer.setMessageListener(new MessageListener(){
                public void onMessage(Message message){
                    // 获取消息的内容
                    if(message instanceof TextMessage){
                        TextMessage textMessage = (TextMessage)message;
                        try{
                            String text = textMessage.getText();
                            // 打印消息内容
                            System.out.println(text);
                        }catch(JMSException e){
                            e.printStackTrace();
                        }
                    }
                }
            });
            // 系统等待接收消息
            System.out.println("topic 消费者1...");
            System.in.read();
            // 关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    }
    

    3.2 Activemq 整合 Spring

    // 导入相关jar包: spring-jms, spring-context-support
    
    // 配置 Activemq 整合 spring, applicationContext-activemq.xml
    <!-- 真正可以产生Connection的ConnectionFactory, 由对应的 JMS 服务厂商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://127.0.0.1:61616"/>
    </bean>
    
    <!-- Spring 用于管理真正的 ConnectionFactory 的 ConnectioinFactory -->
    <bean id="connectioinFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
    </bean>
    
    <!-- 配置生产者 -->
    <!-- 配置JMSTemplate对象,它可以进行消息的发送,接收等 -->
    <bean id="jmsTemplage" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>
    <!-- 配置消息的Destination对象 -->
    <bean id="test-queue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg name="name" value="test-queue"></constructor-arg>
    </bean>
    <bean id="test-topic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg name="name" value="test-topic"></constructor-arg>
    </bean>
    
    
    // 测试类
    // 发送消息
    public class SpringActivemq{
        // 使用jmsTemplate发送消息
        @Test
        public void testJmsTemplate() throws Exception{
            // 初始化spring容器
            ApplicationContext applicationContext =
                new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq");
            // 从容器中获得JmsTemplage对象
            JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
            // 从容器中获得Destination对象
            Destination destination = (Destination)applicationContext.getBean("test-queue");
            // 发送消息
            jmsTemplate.send(destination,new MessageCreator(){
                public Message createMessage(Session session) throws JMSException{
                    TextMessage message =
                                session.createTextMessage("spring activemq send queue message");
                    return message;
                }
            });
        }
    }
    
    // 接收消息
    // applicationContext-activemq.xml
    <!-- 真正可以产生Connection的ConnectionFactory, 由对应的 JMS 服务厂商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://127.0.0.1:61616"/>
    </bean>
    
    <!-- Spring 用于管理真正的 ConnectionFactory 的 ConnectioinFactory -->
    <bean id="connectioinFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
    </bean>
    
    <!-- 配置消息的Destination对象 -->
    <bean id="test-queue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg name="name" value="test-queue"></constructor-arg>
    </bean>
    <bean id="test-topic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg name="name" value="test-topic"></constructor-arg>
    </bean>
    <!-- 配置消息的接收者 -->
    <!-- 配置监听器 -->
    <bean id="myMessageListener" class="com.taotao.search.listener.MyMessageListener"></bean>
    <!-- 消息监听容器 -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"></property>
        <property name="destination" ref="test-queue"></property>
        <property name="messageListener" ref="myMessageListener"></property>
    </bean>
    
    // 创建接收消息的类
    public class MyMessageListener implements MessageListener{
        public void onMessage(Message message){
            // 接收消息
            TextMessage textMessage = (TextMessage)message;
            try{
                String text = textMessage.getText();
                System.out.println(text);
            }catch(JMSException e){
                e.printStackTrace();
            }
        }
    }
    
    // 接收消息测试类
    public class testSpringActiveMq{
    
        @Test
        public void testSpringActiveMq() throws Exception{
            // 初始化spring容器
            ApplicationContext app =
                new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq");
            // 系统等待接收消息
            System.in.read();
        }
    }
    
  • 相关阅读:
    个人推荐网上商店
    vs 安装程序制作
    this linker was not configured to use sysroots和C compiler cannot create executables的解决办法
    将asihttprequest编译后的目标文件打包
    cygwin下的gcc4.7.1编译心得
    给ubuntu12.04换3.4.6的内核
    boost::asio::streambuf相关的操作方法
    应用boost库serialize标准库里的map
    cygwin下gdb7.4编译
    sql server存储过程分页,支持cte
  • 原文地址:https://www.cnblogs.com/linkworld/p/7929104.html
Copyright © 2011-2022 走看看