zoukankan      html  css  js  c++  java
  • JMS 之 ActiveMQ

    ActiveMQ 开发包下载及运行环境搭建

    <<activemq-parent-5.11.1-source-release.zip>>

    <<apache-activemq-5.11.1-bin.zip>>

       

    网页地址

    主页:http://activemq.apache.org/

    目前最新版本:5.11.1

    开发包及源码下载地址:http://activemq.apache.org/activemq-5111-release.html

    ActiveMQ 服务启动地址:http://127.0.0.1:8161/admin/ 用户名/密码 admin/admin

    进入bin目录启动对应操作系统的程序

    进入后台

    http://127.0.0.1:8161/admin/

    用户名和密码都是 admin

    ActiveMQ 点对点消息实现

    直接 Receive 方式

    Session.AUTO_ACKNOWLEDGE。当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage

    方法成功返回的时候,会话自动确认客户收到的消息。

    Session.CLIENT_ACKNOWLEDGE。 客户通过消息的 acknowledge 方法确认消息。需要注意的是,在这种模

    式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消 费的消息。例如,如果一

    个消息消费者消费了 10 个消息,然后确认第 5 个消息,那么所有 10 个消息都被确认。

    Session.DUPS_ACKNOWLEDGE。 该选择只是会话迟钝第确认消息的提交。如果 JMS provider 失败,那么可

    能会导致一些重复的消息。如果是重复的消息,那么 JMS provider 必须把消息头的 JMSRedelivered 字段设置

    true

       

    消息的发送者 JMSProducer

    默认发送地址:failover://tcp://localhost:61616

    public class JMSProducer {

       

    //定义配置文件

    private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名

    private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码

    private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址

    private static final int SENDNUM=10; // 发送的消息数量

       

    public static void main(String[] args) {

       

    ConnectionFactory connectionFactory; // 定义连接工厂,可以生产connection

    Connection connection = null; // 定义连接

    Session session; // 定义会话 接受或者发送消息的线程

    Destination destination; // 定义消息的目的地

    MessageProducer messageProducer; // 定义消息发送者

       

    // 实例化连接工厂

    connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);

       

    try {

    //从消息工厂中获取连接

    connection=connectionFactory.createConnection(); // 通过连接工厂获取连接

    connection.start(); // 启动连接

    // 创建Session,createSession(是否使用事务连接,消息确认的方式)

    session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

    destination=session.createQueue("FirstQueue1"); // 创建消息队列,返回一个目的地

    messageProducer=session.createProducer(destination); // 创建消息发送者,传入目的地

    //调用方法,使用消息发送者,发送消息到session

    sendMessage(session, messageProducer); // 发送消息

    //session事务提交

    session.commit();

    } catch (Exception e) {

    // TODO Auto-generated catch block

    e.printStackTrace();

    } finally{

    //关闭Connection

    if(connection!=null){

    try {

    connection.close();

    } catch (JMSException e) {

    // TODO Auto-generated catch block

    e.printStackTrace();

    }

    }

    }

    }

       

    /**

    * 发送消息

    * @param session

    * @param messageProducer

    * @throws Exception

    */

    public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{

    //JMSProducer.SENDNUM 消息生产者的数量

    for(int i=0;i<JMSProducer.SENDNUM;i++){

    //创建要发送的消息

    TextMessage message=session.createTextMessage("ActiveMQ 发送的消息"+i);

    //控制台打印

    System.out.println("发送消息:"+"ActiveMQ 发送的消息"+i);

    //使用messageProducer发送消息

    messageProducer.send(message);

    }

    }

    }

    消息的消费者 JMSConsumer(一直在监听,搜索消息)

    private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名

    private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码

    private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址

       

    public static void main(String[] args) {

    ConnectionFactory connectionFactory; // 连接工厂

    Connection connection = null; // 连接

    Session session; // 会话 接受或者发送消息的线程

    Destination destination; // 消息的目的地

    MessageConsumer messageConsumer; // 消息的消费者

       

    // 实例化连接工厂

    connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);

       

    try {

    //消费不需要加事务

    connection=connectionFactory.createConnection(); // 通过连接工厂获取连接

    connection.start(); // 启动连接

    session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session

    destination=session.createQueue("FirstQueue1"); // 创建连接的消息队列

    messageConsumer=session.createConsumer(destination); // 创建消息消费者

    //获取消息

    while(true){

    //receive(100000)中为毫秒值,每100000毫秒接收一次消息

    TextMessage textMessage=(TextMessage)messageConsumer.receive(100000);

    if(textMessage!=null){

    //打印接收到的消息

    System.out.println("收到的消息:"+textMessage.getText());

    }else{

    break;

    }

    }

    } catch (JMSException e) {

    // TODO Auto-generated catch block

    e.printStackTrace();

    }

    }

    接收消息的第二种方式,使用监听(常用的

    使用 Listener 监听方式

    创建一个Listener类

    /**

    * 消息监听

    * @author Administrator

    *

    */

    public class Listener implements MessageListener{

    @Override

    public void onMessage(Message message) {

    // TODO Auto-generated method stub

    try {

    System.out.println("收到的消息:"+((TextMessage)message).getText());

    } catch (JMSException e) {

    // TODO Auto-generated catch block

    e.printStackTrace();

    }

    }

    }

    每次接收到消息都会被监听到

    消息消费者 JMSConsumer

    ActiveMQ 发布-订阅消息模式实现(一对多实现)

    发布-订阅消息模式实现

    一个发布者多个订阅者

    消息发布者 JMSProducer

    消息订阅者1 JMSConsumer

    订阅者1的监听

    消息订阅者1 JMSConsumer 1(同样方法创建消息订阅者)

    订阅者2的监听

       

    运行必须先订阅再发布(只有订阅之后才可以收到消息)

    先运行订阅者12,再运行发布者

       

  • 相关阅读:
    Java实现 蓝桥杯VIP 算法提高 P0404
    Java实现 蓝桥杯VIP 算法提高 P0404
    Java实现 蓝桥杯VIP 算法提高 P0404
    Java实现 蓝桥杯VIP 算法提高 P0404
    Java实现 蓝桥杯VIP 算法提高 P0404
    Java实现 蓝桥杯VIP 算法训练 排列问题
    Java实现 蓝桥杯VIP 算法训练 排列问题
    Java实现 蓝桥杯VIP 算法训练 排列问题
    Java实现 蓝桥杯VIP 算法训练 排列问题
    关于模态/非模态对话框不响应菜单的UPDATE_COMMAND_UI消息(对对WM_INITMENUPOPUP消息的处理)
  • 原文地址:https://www.cnblogs.com/ChengR/p/13066400.html
Copyright © 2011-2022 走看看