zoukankan      html  css  js  c++  java
  • ActiveMQ的使用

    ActiveMQ使用分为两大块:生产者和消费者
    一、准备
    项目导入jar包:activemq-all-5.15.3.jar
    并buildpath 
     
    二、生产者
    1. 创建连接工厂
    ActiveMQConnectionFactory mqf = new ActiveMQConnectionFactory(userName, password, brokerURL);
    注:
    userName是ActiveMQ的用户名,默认可以通过:ActiveMQConnection.DEFAULT_USER
    password是ActiveMQ的密码,默认可以通过: ActiveMQConnection.DEFAULT_PASSWORD
    brokerURL是ActiveMQ的连接,指定格式为:tcp://主机名:61616
     
    1. 获取连接
    connection = mqf.createConnection();
     
    1. 生成会话
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
     
    1. 生成对应的topic
    Destination destination = session.createTopic("mytopic");
     
    1. 创建生产者
    MessageProducer producer = session.createProducer(destination);
     
    1. 设置发送消息使用的模式
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    默认是:DeliveryMode.PERSISTENT
     
    1. 生成消息
    TextMessage msg = session.createTextMessage(“message");
     
    1. 启动连接
    connection.start();
     
    1. 发送消息
    producer.send(msg);
     
    1. 关闭生产者
    producer.close();
     
    1. 关闭会话
    session.close();
     
    1. 关闭连接
    connection.close();
     
    三、消费者
    1. 继承接口
    MessageListener
    ExceptionListener
    并实现onException(JMSException exception)和onMessage(Message message)方法
     
    1. 创建连接工厂
    ActiveMQConnectionFactory mqf = new ActiveMQConnectionFactory(userName, password, brokerURL);
    具体参数同上
     
    1. 获取连接
    Connection connection = mqf.createConnection();
     
    1. 生成会话
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
     
    1. 生成对应的topic
    Destination destination = session.createTopic("mytopic”);
     
    1. 创建消费者
    MessageConsumer consumer = session.createConsumer(destination);
     
    1. 启动连接
    connection.start();
     
    1. 设置消息监听
    consumer.setMessageListener(this);
     
    1. 设置异常监听
    connection.setExceptionListener(this);
     
    1. 实现onMessage方法
    改方法有一个参数Message message,这个参数是从ActiveMQ上拿到的消息,可以通过如下方法解析出来:
    TextMessage tm = (TextMessage)message;
    String result = tm.getText();
     
    1. 关闭消费者
    consumer.close();
     
    1. 关闭会话
    session.close();
     
    1. 关闭连接
    connection.close();
     
    四、例程
    1. 生产者实现程序
     1 package activemq_test;
     2  
     3 import javax.jms.Connection;
     4 import javax.jms.DeliveryMode;
     5 import javax.jms.Destination;
     6 import javax.jms.JMSException;
     7 import javax.jms.MessageProducer;
     8 import javax.jms.Session;
     9 import javax.jms.TextMessage;
    10  
    11 import org.apache.activemq.ActiveMQConnection;
    12 import org.apache.activemq.ActiveMQConnectionFactory;
    13  
    14 public class Producer_tool {
    15  
    16     private final static String userName = ActiveMQConnection.DEFAULT_USER;
    17     private final static String password = ActiveMQConnection.DEFAULT_PASSWORD;
    18     private final static String brokerURL = "tcp://192.168.0.5:61616";
    19     private MessageProducer producer = null;
    20     private Connection connection = null;
    21     private Session session = null;
    22     
    23     public void initialize() throws JMSException {
    24         ActiveMQConnectionFactory mqf = new ActiveMQConnectionFactory(userName, password, brokerURL);
    25         connection = mqf.createConnection();
    26         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    27         Destination destination = session.createTopic("mytopic");
    28         producer = session.createProducer(destination);
    29         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    30     }
    31  
    32     public void send(String message) throws JMSException {
    33         initialize();
    34         TextMessage msg = session.createTextMessage(message);
    35         System.out.println("sending message: " + message);
    36         connection.start();
    37         producer.send(msg);
    38     }
    39     
    40     public void close() throws JMSException {
    41         if(producer != null) {
    42             producer.close();
    43         }
    44         if(session != null) {
    45             session.close();
    46         }
    47         if(connection != null) {
    48             connection.close();
    49         }
    50         System.out.println("closed");
    51     }
    52     
    53 }
    1. 生产者主程序
     1 package activemq_test;
     2 import javax.jms.JMSException;
     3 public class Producer_test {
     4     public static void main(String[] args) throws JMSException {
     5         Producer_tool producer = null;
     6         for(int i = 0; i < 10; i++) {
     7             producer = new Producer_tool();
     8             producer.send("message" + i);
     9             producer.close();
    10         }
    11     }
    12 }
     
    1. 消费者实现程序
     1 package activemq_test;
     2  
     3 import javax.jms.Connection;
     4 import javax.jms.Destination;
     5 import javax.jms.ExceptionListener;
     6 import javax.jms.JMSException;
     7 import javax.jms.Message;
     8 import javax.jms.MessageConsumer;
     9 import javax.jms.MessageListener;
    10 import javax.jms.Session;
    11 import javax.jms.TextMessage;
    12  
    13 import org.apache.activemq.ActiveMQConnection;
    14 import org.apache.activemq.ActiveMQConnectionFactory;
    15  
    16 public class Consumer_tool implements MessageListener,ExceptionListener{
    17     
    18     private final static String userName = ActiveMQConnection.DEFAULT_USER;
    19     private final static String password = ActiveMQConnection.DEFAULT_PASSWORD;
    20     private final static String brokerURL = "tcp://192.168.0.5:61616";
    21     private Connection connection = null;
    22     private Session session = null;
    23     private MessageConsumer consumer = null;
    24     static boolean isConnection = false;
    25     
    26     public void initialize() throws JMSException {
    27         ActiveMQConnectionFactory mqf = new ActiveMQConnectionFactory(userName, password, brokerURL);
    28         connection = mqf.createConnection();
    29         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    30         Destination destination = session.createTopic("mytopic");
    31         consumer = session.createConsumer(destination);
    32     }
    33     
    34     public void consumeMessage() throws JMSException {
    35         initialize();
    36         connection.start();
    37         consumer.setMessageListener(this);
    38         connection.setExceptionListener(this);
    39         isConnection = true;
    40         System.out.println("consumer is listening");
    41         
    42     }
    43  
    44     @Override
    45     public void onException(JMSException exception) {
    46         isConnection = false;
    47     }
    48  
    49     @Override
    50     public void onMessage(Message message) {
    51         if(message instanceof TextMessage) {
    52             TextMessage tm = (TextMessage)message;
    53             try {
    54                 System.out.println("consumer received " + tm.getText());
    55             } catch (JMSException e) {
    56                 e.printStackTrace();
    57             }
    58         }
    59         else {
    60             System.out.println(message);
    61         }
    62     }
    63     
    64     public void close() throws JMSException {
    65         if(consumer != null) {
    66             consumer.close();
    67         }
    68         if(session != null) {
    69             session.close();
    70         }
    71         if(connection != null) {
    72             connection.close();
    73         }
    74         System.out.println("consumer has closed");
    75     }
    76 }
     
    1. 消费者主程序
     1 package activemq_test;
     2 import javax.jms.JMSException;
     3 public class Consumer_test {
     4     public static void main(String[] args) throws JMSException {
     5         Consumer_tool consumer = new Consumer_tool();
     6         consumer.consumeMessage();
     7         while(Consumer_tool.isConnection) {
     8  
     9         }
    10         consumer.close();
    11     }
    12 }
  • 相关阅读:
    SQL中distinct的用法
    JMeter压测“java.net.SocketException: Socket closed”解决方法
    Jmeter里http接口的执行顺序是顺序执行
    【Web安全】越权操作——横向越权与纵向越权
    【WPF】使用CefSharp嵌入HTML网页
    【Visual Studio】项目的引用显示黄色叹号
    未能加载文件或程序集”xxxx”或它的某一个依赖项,试图加载格式不正确的程序。
    IDEA无法启动debugger,报错Address localhost:1099 is already in use
    Windows系统32位、64位DLL文件的存放位置
    Charles做代理的Map Remote路径配置
  • 原文地址:https://www.cnblogs.com/xiatianyu/p/9055647.html
Copyright © 2011-2022 走看看