zoukankan      html  css  js  c++  java
  • activitemq整合spring

    image.png

    activitemq整合spring

    一.activmq的点对点模型

    image.png

    pom.xml:
    <?xml version="1.0" encoding="UTF-8"?>
    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.demo</groupId>
        <artifactId>aq-test</artifactId>
        <version>1.0-SNAPSHOT</version>
        <packaging>war</packaging>
    
        <name>aq-test Maven Webapp</name>
        <!-- FIXME change it to the project's website -->
        <url>http://www.example.com</url>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <maven.compiler.source>1.7</maven.compiler.source>
            <maven.compiler.target>1.7</maven.compiler.target>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.11</version>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>javax.jms</groupId>
                <artifactId>jms-api</artifactId>
                <version>1.1-rev-1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
                <version>5.14.5</version>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.34</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.springframework/spring-aop -->
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-aop</artifactId>
                <version>5.1.3.RELEASE</version>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 -->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-pool2</artifactId>
                <version>2.6.0</version>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-dbcp2 -->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-dbcp2</artifactId>
                <version>2.1.1</version>
            </dependency>
    
        </dependencies>
    
        <build>
            <finalName>aq-test</finalName>
            <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
                <plugins>
                    <plugin>
                        <artifactId>maven-clean-plugin</artifactId>
                        <version>3.1.0</version>
                    </plugin>
                    <!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging -->
                    <plugin>
                        <artifactId>maven-resources-plugin</artifactId>
                        <version>3.0.2</version>
                    </plugin>
                    <plugin>
                        <artifactId>maven-compiler-plugin</artifactId>
                        <version>3.8.0</version>
                    </plugin>
                    <plugin>
                        <artifactId>maven-surefire-plugin</artifactId>
                        <version>2.22.1</version>
                    </plugin>
                    <plugin>
                        <artifactId>maven-war-plugin</artifactId>
                        <version>3.2.2</version>
                    </plugin>
                    <plugin>
                        <artifactId>maven-install-plugin</artifactId>
                        <version>2.5.2</version>
                    </plugin>
                    <plugin>
                        <artifactId>maven-deploy-plugin</artifactId>
                        <version>2.8.2</version>
                    </plugin>
                </plugins>
            </pluginManagement>
        </build>
    </project>
    
    ActiviteMq.class:(发送端)
    package com.demo;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.junit.Test;
    
    import javax.jms.*;
    
    public class ActiviteMq {
    
        @Test
        public void testQueueProducer() throws JMSException {
            //1.创建connectinfactory对象,需要指定服务的IP以及端口号
            //brokerURL服务器的ip以及端口号
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    "tcp://192.168.1.20:61616"
            );
    
            //2.使用ConnectionFactory创建
            Connection connection = connectionFactory.createConnection();
    
            //3.开启链接,调用connection对象的start的方法
            connection.start();
    
            //4.使用connection对创建一个session对象
            //[4.1] 第一参数:是否开启事务
            //[4.2] 第二参数:当第一个参数为false的时候 才有意义 消息的应答模式
            //1.自动应答2.手动应答 一般为自动
    
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            //第五步:使用session对象创建一个destination对象(topic,queue) 此处创建一个queue对象
            //参数:队列名称
    
            Queue queue = session.createQueue("test-queue2");
    
            //第六步 使用session创建一个producer对象
            MessageProducer producer = session.createProducer(queue);
    
            //第七步 创建一个message对象 创建一个textmessage对象
            TextMessage textMessage = session.createTextMessage("风风光光");
    
            //第八步 使用producer对象发送消息
            producer.send(textMessage);
    
            //第九步 关闭资源
             producer.close();
             session.close();
             connection.close();
    
        }
    }
    
    ReceiveMsf.class:(接收端)
    package com.demo;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.junit.Test;
    
    import javax.jms.*;
    import java.io.IOException;
    
    public class ReceiveMsf {
    
        @Test
        public void testQueueConsumer() throws JMSException, IOException {
    
            //1.创建connectinfactory对象,需要指定服务的IP以及端口号
            //brokerURL服务器的ip以及端口号
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    "tcp://192.168.1.20:61616"
            );
    
            //2.使用ConnectionFactory创建
            Connection connection = connectionFactory.createConnection();
    
            //3.开启链接,调用connection对象的start的方法
            connection.start();
    
            //4.使用connection对创建一个session对象
            //[4.1] 第一参数:是否开启事务
            //[4.2] 第二参数:当第一个参数为false的时候 才有意义 消息的应答模式
            //1.自动应答2.手动应答 一般为自动
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //第五步:使用session对象创建一个destination对象(topic,queue) 此处创建一个queue对象
            //参数:队列名称
    
            Queue queue = session.createQueue("test-queue2");
            // 第六步:使用Session对象创建一个Consumer对象。
            MessageConsumer consumer = session.createConsumer(queue);
    
            consumer.setMessageListener(
                    new MessageListener() {
                        @Override
                        public void onMessage(Message message) {
                            try {
                                TextMessage textMessage = (TextMessage) message;
                                String text = null;
                                //取消的内容
                                text = textMessage.getText();
                                //第八步 打印消息
                                System.out.println(text);
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                    }
            );
    
            //等待键盘输入 阻塞
            System.in.read();
    
            //第九步 关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    }
    

    二.activmq的发布订阅模型

    image.png

    TopicProducer.class
    package com.demo.dingyue;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.junit.Test;
    
    import javax.jms.*;
    
    public class TopicProducer {
    
        @Test
        public void testTopicProducer() throws JMSException {
    
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    "tcp://192.168.1.20:61616"
            );
    
            Connection connection = connectionFactory.createConnection();
    
            connection.start();
    
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            Topic topic = session.createTopic("huaYuanBaoBao");
    
            MessageProducer producer = session.createProducer(topic);
    
            TextMessage textMessage = session.createTextMessage("这个是发布订阅的");
    
            producer.send(textMessage);
    
            producer.close();
            session.close();
            connection.close();
        }
    
    }
    
    TopicCustomer.class:
    package com.demo.dingyue;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.junit.Test;
    
    import javax.jms.*;
    import java.io.IOException;
    
    public class TopicCustomer {
    
        @Test
        public void testTopicCustomer() throws JMSException, IOException {
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    "tcp://192.168.1.20:61616"
            );
    
    
            Connection connection = connectionFactory.createConnection();
    
            connection.start();
    
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            Topic topic = session.createTopic("huaYuanBaoBao");
    
    
            MessageConsumer consumer = session.createConsumer(topic);
    
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try{
    
                   TextMessage textMessage = (TextMessage) message;
                   String text = null;
                   //取出消息的内容
                 text= textMessage.getText();
    
                        System.out.println(text);
    
    
                    }catch (Exception e){
    
                        e.printStackTrace();
                    }
                }
            });
    
            System.out.println("消费端03");
            System.in.read();
    
            //关闭资源
            connection.close();
            consumer.close();
            session.close();
    
        }
    }
    
    

    和Spring整合:

    spring-amq.xml:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:aop="http://www.springframework.org/schema/aop"
           xmlns:tx="http://www.springframework.org/schema/tx"
           xsi:schemaLocation="
            http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context-3.0.xsd
            http://www.springframework.org/schema/aop
            http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
            http://www.springframework.org/schema/tx
            http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">
    
        <context:component-scan base-package="com.demo.spring"/>
    
        <bean id="amqSenderService" class="com.demo.spring.AMQSenderServiceImpl">
        <!--<bean id="user" class="com.demo.spring.User">-->
        </bean>
    
        <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
              destroy-method="stop">
            <property name="connectionFactory">
                <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                    <property name="trustAllPackages" value="true"/>
                    <property name="brokerURL">
                        <value>tcp://192.168.1.20:61616</value>
                    </property>
                </bean>
            </property>
            <property name="maxConnections" value="100"></property>
        </bean>
    
        <!--使用缓存可以提升效率-->
        <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
            <property name="targetConnectionFactory" ref="jmsFactory"/>
            <property name="sessionCacheSize" value="1"/>
        </bean>
    
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="cachingConnectionFactory"/>
            <property name="messageConverter">
                <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
            </property>
        </bean>
    
        <!--测试Queue,队列的名字是spring-queue-->
        <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <!--<constructor-arg index="0" value="spring-queue"/>-->
            <constructor-arg name="name" value="spring-queue"/>
        </bean>
    
        <!--测试Topic-->
        <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg index="0" value="spring-topic"/>
        </bean>
    
    </beans>
    
    AMQSenderServiceImpl:
    package com.demo.spring;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.Resource;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.Session;
    
    @Service
    public class AMQSenderServiceImpl  {
    
        private static final Logger logger = LoggerFactory.getLogger(AMQSenderServiceImpl.class);
    
        @Resource(name = "jmsTemplate")
        private JmsTemplate jmsTemplate;
    
        //目的地队列的明证,我们要向这个队列发送消息
        @Resource(name = "destinationQueue")
        private Destination destination;
    
        //向特定的队列发送消息
        public void sendMsg(final User user) {
    //        final String msg = JSON.toJSONString(mqParamDto);
            user.setEmail("javaceshi@aa.com");
            user.setPassword("123456");
            user.setPhone("123456");
            user.setSex("M");
            user.setUsername("javaceshi");
    
            try {
                logger.info("将要向队列{}发送的消息msg:{}", destination, user);
                jmsTemplate.send(destination, new MessageCreator() {
                    @Override
                    public Message createMessage(Session session) throws JMSException {
    //                    return session.createObjectMessage(user);
                        return  session.createTextMessage("2019/1/18message");
                    }
                });
    
            } catch (Exception ex) {
                logger.error("向队列{}发送消息失败,消息为:{}", destination, user);
            }
    
        }
    }
    
    AMQReceiverServiceImpl:
    package com.demo.spring;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.Resource;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.Session;
    
    @Service
    public class AMQReceiverServiceImpl {
        private static final Logger logger = LoggerFactory.getLogger(AMQSenderServiceImpl.class);
    
        @Resource(name = "jmsTemplate")
        private JmsTemplate jmsTemplate;
    
        //目的地队列的明证,我们要向这个队列接收消息
        @Resource(name = "destinationQueue")
        private Destination destination;
    
        //向特定的队列接收消息
        public void receiverMsg(final User user) {
    //
    
            try {
                Object object = jmsTemplate.receive(destination);
                User msg = (User) object;
                System.out.println(msg);
    
            } catch (Exception ex) {
                ex.printStackTrace();
            }
    
        }
    }
    
    

    测试类:App

    package com.demo.spring;
    
    import com.demo.spring.User;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    /**
     * 主发送类
     *
     */
    public class App
    {
        public static void main( String[] args )
        {
            final  User user = new User();
            user.setEmail("javaceshi@aa.com");
            user.setPassword("123456");
            user.setPhone("123456");
            user.setSex("M");
            user.setUsername("javaceshi");
    
            ApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring-amq.xml");
            AMQSenderServiceImpl sendService = (AMQSenderServiceImpl)context.getBean("amqSenderService");
            sendService.sendMsg(user);
    //        sendService.send(user);
            System.out.println("send successfully, please visit http://192.168.1.20:8161/admin to see it");
        }
    }
    
    
  • 相关阅读:
    [转] 献给所有正在找路的人
    在同一表单内,多个提交按钮的处理方式
    javascript高级选择器querySelector和querySelectorAll
    一位年轻女董事长的37条忠告很受启发吧?
    函数的延迟加载
    WCF的CommunicationObjectFaultedException异常问题
    WCF Test Client对象数组输入问题
    [转载]C#开发Winform记录用户登录状态的方法
    using(C#)
    使用 SCTP 优化网络
  • 原文地址:https://www.cnblogs.com/charlypage/p/10306801.html
Copyright © 2011-2022 走看看