zoukankan      html  css  js  c++  java
  • JMS

    首先需要下载ActiveMQ,下面的链接给我们列出了所有版本:
    http://activemq.apache.org/download-archives.html
    每个版本为不同的OS提供了链接:

    公司电脑是windows的,用目录下的activemq.bat启动:

    端口号默认是61616,可以在conf/activemq.xml中看到:

    <transportConnectors>
        <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
        <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    </transportConnectors>

    相关的Maven dependency:

    <dependency>
        <groupId>javax.jms</groupId>
        <artifactId>javax.jms-api</artifactId>
        <version>2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-core</artifactId>
        <version>5.7.0</version>
    </dependency>

    使用javax.jms.Session跟JMS Provider通信:

    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
            ActiveMQConnection.DEFAULT_USER,
            ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
                                                                                                                                                    
    Connection connection = connectionFactory.createConnection();
    connection.start();
                                                                                                                                                    
    Session session = connection.createSession(Boolean.TRUE,
            Session.AUTO_ACKNOWLEDGE);

    然后一些目的地、发送者、发送内容什么的都是由session来弄的:

    Destination destination = session.createQueue("this is sparta!!");
    
    MessageProducer producer = session.createProducer(destination);
    
    TextMessage message0 = session.createTextMessage("这是斯巴达!!!");
    TextMessage message1 = session.createTextMessage("这也是斯巴达!!!");
    TextMessage message2 = session.createTextMessage("这些都是斯巴达!!!");
    
    producer.send(message0);
    producer.send(message1);
    producer.send(message2);
    
    session.commit();

    有了producer,相应地也有consumer,接收消息方法如下:

    MessageConsumer consumer = session.createConsumer(destination);
    System.out.println(((TextMessage) consumer.receive(10000)).getText());

    结果还是consumer去一个个receive了,就像是接收人亲自去确认那样。
    或许我们可以让Listener代劳:

    consumer.setMessageListener(new MessageListener() {
        public void onMessage(Message message) {
            try {
                System.out.println("listener catched:::"+((TextMessage)message).getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    });

    当这个consumer设置了Listener的时候就不能再以receive()的方式接收了,
    不然会出现javax.jms.IllegalStateException:Cannot synchronously receive a message when a MessageListener is set

    如果想使用publish/subscribe,直接将createQueue改为createTopic即可,但需要理解Topic是无状态的。

    完整code如下,发送者:

    {
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnection.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
            Connection connection = connectionFactory.createConnection();
            connection.start();
                                                                                                                                                                                                                                                                
            Session session = connection.createSession(Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);
                                                                                                                                                                                                                                                                
            Destination destination = session.createQueue("this is sparta!!");
                                                                                                                                                                                                                                                                
            MessageProducer producer = session.createProducer(destination);
            TextMessage message0 = session.createTextMessage("这是斯巴达!!!");
            TextMessage message1 = session.createTextMessage("这也是斯巴达!!!");
            TextMessage message2 = session.createTextMessage("这些都是斯巴达!!!");
            producer.send(message0);
            producer.send(message1);
            producer.send(message2);
                                                                                                                                                                                                                                                                
            session.commit();
        }

    接收者:

    {
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnection.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");;
            Connection connection = connectionFactory.createConnection();
            connection.start();
                                                                                                                                                                                                                                                            
            Session session = connection.createSession(Boolean.FALSE,
                    Session.AUTO_ACKNOWLEDGE);
                                                                                                                                                                                                                                                            
            Destination destination = session.createQueue("this is sparta!!");
            MessageConsumer consumer = session.createConsumer(destination);
            System.out.println(((TextMessage) consumer.receive(10000)).getText());
            System.out.println(((TextMessage) consumer.receive(10000)).getText());
            System.out.println(((TextMessage) consumer.receive(10000)).getText());
        }
  • 相关阅读:
    前端优化
    Git基础使用
    【高可用架构】用Nginx实现负载均衡(三)
    【高可用架构】借助Envoy工具发布项目到多台服务器(二)
    【高可用架构】开发机上部署Deploy项目(一)
    【Linux系列】Centos7安装Samba并将工作区挂载到win(八)
    【Linux系列】Centos 7部署Laravel项目(七)
    【Linux系列】Centos 7安装 Redis(六)
    【Linux系列】Centos 7安装 Mysql8.0(五)
    gitlab服务器搭建
  • 原文地址:https://www.cnblogs.com/kavlez/p/4071905.html
Copyright © 2011-2022 走看看