zoukankan      html  css  js  c++  java
  • ActiveMQ

    首先需要下载ActiveMQ,下面的链接给我们列出了所有版本:
    http://activemq.apache.org/download-archives.html
    每个版本为不同的OS提供了链接:

    公司电脑是windows的,用目录下的activemq.bat启动: 

    端口号默认是61616,可以在conf/activemq.xml中看到:

    <transportConnectors>
        <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
        <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    </transportConnectors>
    


    相关的Maven dependency:

    <dependency>
        <groupId>javax.jms</groupId>
        <artifactId>javax.jms-api</artifactId>
        <version>2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-core</artifactId>
        <version>5.7.0</version>
    </dependency>
    


    使用javax.jms.Session跟JMS Provider通信,好像说了句废话...:

    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
            ActiveMQConnection.DEFAULT_USER,
            ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
    
    Connection connection = connectionFactory.createConnection();
    connection.start();
    
    Session session = connection.createSession(Boolean.TRUE,
            Session.AUTO_ACKNOWLEDGE);
    


    然后一些目的地、发送者、发送内容什么的都是由session来弄的:

    Destination destination = session.createQueue("this is sparta!!");
    
    MessageProducer producer = session.createProducer(destination);
    
    TextMessage message0 = session.createTextMessage("这是斯巴达!!!");
    TextMessage message1 = session.createTextMessage("这也是斯巴达!!!");
    TextMessage message2 = session.createTextMessage("这些都是斯巴达!!!");
    
    producer.send(message0);
    producer.send(message1);
    producer.send(message2);
    
    session.commit();
    


    有了producer,相应地也有consumer,接收消息方法如下:

    MessageConsumer consumer = session.createConsumer(destination);
    System.out.println(((TextMessage) consumer.receive(10000)).getText());
    


    结果还是consumer去一个个receive了,就像是接收人亲自去确认那样。
    或许我们可以让Listener代劳:

    consumer.setMessageListener(new MessageListener() {
        public void onMessage(Message message) {
            try {
                System.out.println("listener catched:::"+((TextMessage)message).getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    });
    

    当这个consumer设置了Listener的时候就不能再以receive()的方式接收了,
    不然会出现javax.jms.IllegalStateException:Cannot synchronously receive a message when a MessageListener is set...

    如果想使用publish/subscribe,直接将createQueue改为createTopic即可,但需要理解Topic是无状态的。


    完整code如下,发送者:

    {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
    
        Session session = connection.createSession(Boolean.TRUE,
                Session.AUTO_ACKNOWLEDGE);
    
        Destination destination = session.createQueue("this is sparta!!");
    
        MessageProducer producer = session.createProducer(destination);
        TextMessage message0 = session.createTextMessage("这是斯巴达!!!");
        TextMessage message1 = session.createTextMessage("这也是斯巴达!!!");
        TextMessage message2 = session.createTextMessage("这些都是斯巴达!!!");
        producer.send(message0);
        producer.send(message1);
        producer.send(message2);
    
        session.commit();
    }
    


    接收者:

    {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");;
        Connection connection = connectionFactory.createConnection();
        connection.start();
    
        Session session = connection.createSession(Boolean.FALSE,
                Session.AUTO_ACKNOWLEDGE);
    
        Destination destination = session.createQueue("this is sparta!!");
        MessageConsumer consumer = session.createConsumer(destination);
        System.out.println(((TextMessage) consumer.receive(10000)).getText());
        System.out.println(((TextMessage) consumer.receive(10000)).getText());
        System.out.println(((TextMessage) consumer.receive(10000)).getText());
    }

    这次试试集成到Spring。
    下面的连接是ActiveMQ官网提供的文档。
    http://activemq.apache.org/spring-support.html

    下面是我添加的一些dependency,基本的spring依赖我就不列举了:

    <!-- jms activemq -->
    <dependency>
        <groupId>javax.jms</groupId>
        <artifactId>javax.jms-api</artifactId>
        <version>2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-core</artifactId>
        <version>${activemq.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-pool</artifactId>
        <version>${activemq.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-spring</artifactId>
        <version>${activemq.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.xbean</groupId>
        <artifactId>xbean-spring</artifactId>
        <version>3.16</version>
    </dependency>
    <dependency>
    
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
        <version>${spring.version}</version>
    </dependency>
    

    maven中添加时要注意还有个xbean-spring;
    之前并没有注意,运行发现异常提示 ClassNotFound:org.apache.xbean.spring.context.v2.XBeanNamespaceHandler;

    后来我添加了xbean-v2,结果提示v2c,于是我添加v2c,后来感觉太傻就加了xbean-spring。

    配置方面可以使用jms和activeMq的标签:

    xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    

    相应的xsi:schemaLocation:

    http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
    http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
    


    关于connectionFactory的配置可以使用amq标签:

    <amq:connectionFactory id="jmsFactory" brokerURL="tcp://localhost:61616" />
    


    但是在这里我打算试试PooledConnectionFactory;
    关于org.apache.activemq.pool.PooledConnectionFactory官网有以下解释(简单到位,都不用翻译了):

    If you are not using a JCA container to manage your JMS connections, we recommend you use our pooling JMS connection provider, (org.apache.activemq.pool.PooledConnectionFactory) from the activemq-pool library, which will pool the JMS resources to work efficiently with Spring's JmsTemplate or with EJBs.


    对于其属性,下面根据javaDoc给出一些解释:

    • MaximumActiveSessionPerConnection:每个Connection的最大Session数
    • BlockIfSessionPoolIsFull:默认为session池满时请求获得session会阻塞;设置false则会抛出JMSException
    • MaxConnections:最大连接数
    • IdleTimeout:空闲时间,默认为30秒
    • CreateConnectionOnStartup:是否在启动时创建connection


    在这里我先用默认参数声明,不知道为什么总是报MalformPrameterizedType...

    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" />
    


    上次用的队列,这次换用Topic试试...

    <bean id="destination" class="org.apache.activemq.command.ActiveMQTopic">
      <constructor-arg index="0" value="spartaTopic"></constructor-arg>
    </bean>
    


    当然也可以使用amq标签:

    <amq:topic physicalName="sparta" />
    


    如果是使用queue的话:

    <amq:queue physicalName="sparta" />
    


    难道我将这些放到spring里就是为了用用标签方便DI?
    用里的话来说就是jmsTemplate是<spring对jms支持核心的部分>
    (另有jmsTemplate102为适应jms1.0.2的);
    和jdbcTemplate那样 jmsTemplate也有提供相似的优势。
    比如,像jdbcTemplate处理失控的jdbc代码那样,用jmsTemplate处理失控的jms代码。
    或者,如果在使用JmsTemplate是捕捉到了JMSException,JmsTemplate将捕获该异常,然后抛出一个Spring自带的JmsException的子类异常(个人感觉spring提供的不是更详细的异常信息,只是侧重点不同...)。

    比如:

    • ListenerExecutionFailedException:监听器执行失败
    • MessageConversionException:消息转换失败
    • SynchedLocalTransactionFailedException:同步本地事务未完成
    • UncategorizedJmsException:没有适合异常的其他情况


    如果我们catch了JMSException,我们依然可以把他转为JmsException:

    catch (JMSException e) {
        e.printStackTrace();
        JmsException je = JmsUtils.convertJmsAccessException(e);
    }
    


    现在试着配置jmsTemplate:

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" >
        <property name="connectionFactory" >
            <bean class="org.apache.activemq.pool.PooledConnectionFactory" />
        </property>
        <property name="defaultDestination" >
            <amq:topic physicalName="sparta" />
        </property>
    </bean>
    


    这样编写代码时就变得简单多了,之前那些connectionFactory,connection,session,consumer,producer统统不见了;
    我只需要(匿名内部类可能有些碍眼):

    ApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
    JmsTemplate template = (JmsTemplate)context.getBean("jmsTemplate");
    template.send(new MessageCreator() {
        public Message createMessage(Session session) throws JMSException {
            ActiveMQMapMessage msg = (ActiveMQMapMessage)session.createMapMessage();
            msg.setString("msg", "This is sparta!!");
            return msg;
        }
    });
    

    接收时只需要:

    template.receive();
    


    但需要注意!这个receive是同步接收消息的,他会一直阻塞到有消息个接收。
    可能会想到MessageListener,比如我们可以给一个MessageConsumer对象setMessageListener:

    MessageConsumer consumer = session.createConsumer(destination);
    consumer.setMessageListener(new MessageListener() {
        public void onMessage(Message message) {
            try {
                System.out.println("listener catched:::"+((TextMessage)message).getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    });
    


    上面代码中的MessageListener实例,如果新建一个专门用来监听的类,实现MessageListener接口并加上MessageDriven标注就会出现一个问题——他不够pojo。他有侵入性,我不想要任何实现接口的语法出现在代码中。

    于是我可以用listener-container;
    现在我创建一个类去监听,比如:

    public class CustomedListener {
        void processHandle(HashMap<String,String> map){
            System.out.println("msg:::"+map.get("msg"));
        }
    }
    


    但需要注意的是方法的参数列表,他并不是随便定义的。
    上面的publisher发送的message是ActiveMQMapMessage,这就需要我把参数定义为上面那种形式。
    然后看一下spring中如何配置这个Listener:

    <bean id="myListener" class="pac.testcase.jms.CustomedListener"/>
    <jms:listener-container connection-factory="connectionFactory">
        <jms:listener destination="sparta" ref="myListener" method="processHandle"/>
    </jms:listener-container>
    

    这样我就不需要去调用receive了,有消息就接收。

    现在试试通过JMS,在应用程序之间发送消息。 先看看spring提供的RPC方案(其实还有其他方案,只是没见过谁用)。 需要使用到这两个类:

    • org.springframework.jms.remoting.JmsInvokerServiceExporter将bean导出为基于消息的服务
    • org.springframework.jms.remoting.JmsInvokerProxyFactoryBean让客户端调用服务

    比较一下JmsInvokerServiceExporter和RmiServiceExporter:


    我创建一个接口和实现类如下:

    package pac.testcase.jms;
    public interface JmsRmiService {
        String doServe(String requestedNum);
    }
    


    实现:

    package pac.testcase.jms;
    import org.springframework.stereotype.Service;
    @Service
    public class JmsRmiServiceImpl implements JmsRmiService {
    
        public String doServe(String content) {
            System.out.println(content.concat(" has been requested!!"));
            return "your message::".concat(content).concat(":::length:")+content.length();
        }
    }
    


    将这个pojo声明为服务,在spring配置文件中配置:

    <bean id="serverService" class="org.springframework.jms.remoting.JmsInvokerServiceExporter"
        p:serviceInterface="pac.testcase.jms.JmsRmiService"
        p:service-ref="JmsRmiServiceImpl">
    </bean>
    


    需将他设置为jms监听器,配置方法和一般的jmsMessageListener的配置相同:

    <amq:connectionFactory id="jmsFactory" />
    <jms:listener-container
        destination-type="queue"
        connection-factory="jmsFactory"
        concurrency="3"
        container-type="simple">
        <jms:listener  destination="sparta" ref="serverService"  />
    </jms:listener-container>
    


    container-type有simple和default,根据不同的type也可以使用task-Executor,这里先简单记录一下。
    先启动jms broker再启动:

    new ClassPathXmlApplicationContext("classpath:applicationContext-*.xml").getBean(JmsRmiService.class);
    


    client这边我需要一个调用代理帮我去调用接口,也就是JmsInvokerProxyFactoryBean;
    配置如下:

    <amq:connectionFactory id="connectionFactory" />
    <bean id="clientService" class="org.springframework.jms.remoting.JmsInvokerProxyFactoryBean"
        p:serviceInterface="pac.test.jms.SenderRmiService"
        p:connectionFactory-ref="connectionFactory"
        p:queueName="sparta"/>
    


    配置中的serviceInterface是client端中根据要调用的方法创建的一个接口。
    main方法试着调用看看:

    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
        SenderRmiService service = (SenderRmiService)context.getBean("clientService");
        System.out.println(service.doServe("这才是斯巴达!!"));
    }
    

    server端输出: 

    client端输出: 

  • 相关阅读:
    XAF 有条件的对象访问权限
    XAF 顯示 UnInplace Report(設置自定義條件顯示報表,不是根據選擇ListView記錄條件顯示報表)
    XAF 如何自定义PivotGrid单元格显示文本?
    XAF 如何布局详细视图上的按钮
    XAF How to set size of a popup detail view
    XAF Delta Replication Module for Devexpress eXpressApp Framework
    XAF 帮助文档翻译 EasyTest Basics(基础)
    XAF 用户双击ListView记录时禁止显示DetailView
    XAF How to enable LayoutView mode in the GridControl in List Views
    XAF 如何实现ListView单元格批量更改?
  • 原文地址:https://www.cnblogs.com/kavlez/p/4160358.html
Copyright © 2011-2022 走看看