zoukankan      html  css  js  c++  java
  • ActiveMQ入门实例

    1.下载ActiveMQ

    去官方网站下载:http://activemq.apache.org/

    2.运行ActiveMQ

    解压缩apache-activemq-5.5.1-bin.zip,然后双击apache-activemq-5.5.1inactivemq.bat运行ActiveMQ程序。

    启动ActiveMQ以后,登陆:http://localhost:8161/admin/,创建一个Queue,命名为FirstQueue。

    3.创建Eclipse项目并运行

    创建project:ActiveMQ-5.5,并导入apache-activemq-5.5.1lib目录下需要用到的jar文件,项目结构如下图所示:

    3.1.Sender.java

    复制代码
    package com.xuwei.activemq;

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;

    public class Sender {
    private static final int SEND_NUMBER = 5;

    public static void main(String[] args) {
    // ConnectionFactory :连接工厂,JMS 用它创建连接
    ConnectionFactory connectionFactory;
    // Connection :JMS 客户端到JMS Provider 的连接
    Connection connection = null;
    // Session: 一个发送或接收消息的线程
    Session session;
    // Destination :消息的目的地;消息发送给谁.
    Destination destination;
    // MessageProducer:消息发送者
    MessageProducer producer;
    // TextMessage message;
    // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
    connectionFactory = new ActiveMQConnectionFactory(
    ActiveMQConnection.DEFAULT_USER,
    ActiveMQConnection.DEFAULT_PASSWORD,
    "tcp://localhost:61616");
    try {
    // 构造从工厂得到连接对象
    connection = connectionFactory.createConnection();
    // 启动
    connection.start();
    // 获取操作连接
    session = connection.createSession(Boolean.TRUE,
    Session.AUTO_ACKNOWLEDGE);
    // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
    destination = session.createQueue("FirstQueue");
    // 得到消息生成者【发送者】
    producer = session.createProducer(destination);
    // 设置不持久化,此处学习,实际根据项目决定
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    // 构造消息,此处写死,项目就是参数,或者方法获取
    sendMessage(session, producer);
    session.commit();
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    try {
    if (null != connection)
    connection.close();
    } catch (Throwable ignore) {
    }
    }
    }

    public static void sendMessage(Session session, MessageProducer producer)
    throws Exception {
    for (int i = 1; i <= SEND_NUMBER; i++) {
    TextMessage message = session
    .createTextMessage("ActiveMq 发送的消息" + i);
    // 发送消息到目的地方
    System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
    producer.send(message);
    }
    }
    }
    复制代码

    3.2.Receiver.java

    复制代码
    package com.xuwei.activemq;

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;

    public class Receiver {
    public static void main(String[] args) {
    // ConnectionFactory :连接工厂,JMS 用它创建连接
    ConnectionFactory connectionFactory;
    // Connection :JMS 客户端到JMS Provider 的连接
    Connection connection = null;
    // Session: 一个发送或接收消息的线程
    Session session;
    // Destination :消息的目的地;消息发送给谁.
    Destination destination;
    // 消费者,消息接收者
    MessageConsumer consumer;
    connectionFactory = new ActiveMQConnectionFactory(
    ActiveMQConnection.DEFAULT_USER,
    ActiveMQConnection.DEFAULT_PASSWORD,
    "tcp://localhost:61616");
    try {
    // 构造从工厂得到连接对象
    connection = connectionFactory.createConnection();
    // 启动
    connection.start();
    // 获取操作连接
    session = connection.createSession(Boolean.FALSE,
    Session.AUTO_ACKNOWLEDGE);
    // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
    destination = session.createQueue("FirstQueue");
    consumer = session.createConsumer(destination);
    while (true) {
    //设置接收者接收消息的时间,为了便于测试,这里谁定为100s
    TextMessage message = (TextMessage) consumer.receive(100000);
    if (null != message) {
    System.out.println("收到消息" + message.getText());
    } else {
    break;
    }
    }
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    try {
    if (null != connection)
    connection.close();
    } catch (Throwable ignore) {
    }
    }
    }
    }
    复制代码

    4.注意事项

    1. 最后接收者跟发送者在不同的机器上测试
    2. 项目所引用的jar最后在ActiveMQ下的lib中找,这样不会出现版本冲突。

    5.测试过程

    因为是在单机上测试,所以需要开启两个eclipse,每一个eclipse都有自身的workspace。我们在eclipse1中运行Receiver,在eclipse2中运行Sender。

    刚开始eclipse1中运行Receiver以后console介面没有任何信息,在eclipse2中运行Sender以后,eclipse2中的console显示如下信息:

    发送消息:ActiveMq 发送的消息1
    发送消息:ActiveMq 发送的消息2
    发送消息:ActiveMq 发送的消息3
    发送消息:ActiveMq 发送的消息4
    发送消息:ActiveMq 发送的消息5

    而回到eclipse1中发现console界面出现如下信息:

    收到消息ActiveMq 发送的消息1
    收到消息ActiveMq 发送的消息2
    收到消息ActiveMq 发送的消息3
    收到消息ActiveMq 发送的消息4
    收到消息ActiveMq 发送的消息5

     PS:2012-2-27

    今天发现测试并不需要开启两个eclipse,在一个eclipse下页可以启动多个程序,并且有多个console,在上面的Receiver.java中,设置一个较大的时间,比如receive(500000),如下代码所示:

    TextMessage message = (TextMessage) consumer.receive(500000);

    这个时候运行Receiver.java的话,会使得这个Receiver.java一直运行500秒,在eclipse中可以发现:

    点击那个红色方块可以手动停止运行程序。

    运行玩receiver以后我们在运行sender,在运行完sender以后,我们要切换到receiver的console,如下图所示:

  • 相关阅读:
    idea设置全局ignore
    win 2012 安装mysql 5.7.20 及报错 This application requires Visual Studio 2013 Redistributable. Please ins
    win 2012 安装mysql 5.7.20 及报错 This application requires Visual Studio 2013 Redistr
    kafka 删除 topic
    java编译中出现了Exception in thread “main" java.lang.UnsupportedClassVersionError
    Centos中使用yum安装java时,没有jps的问题的解决
    Spring 整合Junit
    Spring纯注解配置
    Spring 基于注解的 IOC 配置
    打印java系统的信息
  • 原文地址:https://www.cnblogs.com/iteakey/p/4136907.html
Copyright © 2011-2022 走看看