zoukankan      html  css  js  c++  java
  • activemq的使用方法

    activemq是Apache的一款开源消息总线,主要用来做消息的分发。

    首先需要下载MQ,进行启动。

    然后在控制台创建队列,初始用户名密码admin/admin。

    然后可以写生产者、消费者进行测试了。由于activemq支持spring,因此有两种不同的写法:

    方法一:创建factory, connection, session, destination, producer,consumer

    方法二:通过配置文件进行创建(未尝试)。

    最初在其作用的理解上有一些偏差,实际上是,在发送端引入MQ的jar包,向指定的MQ服务器发送信息,MQ会自动将其添加到消息队列中,用控制台可以比较清晰的看到队列情况:http://localhost:8161/admin/

    在接收端循环扫描要接收的队列,当读取到信息时进行接收处理。

    需要注意的是,mq支持持久化,可将消息持久化到本地文件、数据库。

    另一个需要注意的地方是,创建会话session时,第一个参数为true时,需要向服务器确认消息的接收。否则服务器认为没有成功接收,引用一下其他同学的话:

    createSession(paramA,paramB);

    paramA 取值有 : true or false 表示是否支持事务

    paramB 取值有:Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE,SESSION_TRANSACTED

    createSession(paramA,paramB);

    paramA是设置事务的,paramB设置acknowledgment mode

    paramA设置为false时:paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。

    paramA设置为true时:paramB的值忽略, acknowledgment mode被jms服务器设置为SESSION_TRANSACTED 。

    Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。

    Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会删除消息。

    DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。在需要考虑资源使用时,这种模式非常有效。

    附代码
    接收端:

    package com.receiver;
    
    
    import java.io.FileNotFoundException;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.PrintStream;
    import java.util.Properties;
    
    import javax.jms.*;
    
    import org.apache.activemq.*;
    
    public class MessageReceiver implements IMessageReceiver {
        
        public ActiveMQConnectionFactory connectionFactory = null;
        public Connection connection = null;
        public Session session = null;
        public Destination destination = null;
        public MessageConsumer getConsumer() {
            return consumer;
        }
    
    
        public void setConsumer(MessageConsumer consumer) {
            this.consumer = consumer;
        }
    
        public MessageConsumer consumer = null;
        
        //初始化,创建factory, connection, session, destination, producer
        public MessageReceiver(){                
            try {
                InputStream inProperties=MessageReceiver.class.getResourceAsStream("../config/connection.properties");
                Properties properties = new Properties();
                properties.load(inProperties);
                //创建factory
                connectionFactory = new ActiveMQConnectionFactory(properties.getProperty("name"),
                        properties.getProperty("password"),
                        properties.getProperty("brokerURL"));
                //创建connection
                connection = connectionFactory.createConnection();
                connection.start();
                //获取操作连接
                session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
                //获取消息目的地,需在控制台配置
                destination = session.createQueue(properties.getProperty("queueName"));
                //得到消息接收者
                consumer = session.createConsumer(destination);
                
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    
        public void ReceiveMessage(MessageConsumer consumer) {
            int i = 0;
            while(true){
                try {
                    TextMessage message = (TextMessage) consumer.receive(RECEIVE_TIME);
                    if(message != null){
                        System.out.println("queue1 "+message.getText()+"   "+i);
                        FileOutputStream out;
                        out = new FileOutputStream("D:/test.txt");
                        PrintStream p=new PrintStream(out);
                        p.println("queue1 "+message.getText()+"   "+i);
                        out.close();
                        
                    }
                     Thread.sleep(1000);
                     i++;
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } catch (FileNotFoundException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
        
        public void CloseConnection(Connection connection) {
            if(connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {                
                    e.printStackTrace();
                }
            }
        }
        
        public ConnectionFactory getConnectionFactory() {
            return connectionFactory;
        }
    
        public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
            this.connectionFactory = connectionFactory;
        }
    
        public Connection getConnection() {
            return connection;
        }
    
        public void setConnection(Connection connection) {
            this.connection = connection;
        }
    
        public Session getSession() {
            return session;
        }
    
        public void setSession(Session session) {
            this.session = session;
        }
    
        public Destination getDestination() {
            return destination;
        }
    
        public void setDestination(Destination destination) {
            this.destination = destination;
        }
    
    }
    package com.receiver;
    
    import javax.jms.JMSException;
    import javax.jms.TextMessage;
    
    public class ReceiveMain {
    
        /**
         * @param args
         * @throws JMSException 
         */
        public static void main(String[] args) throws JMSException {
            MessageReceiver messageReceiver = new MessageReceiver();
            messageReceiver.ReceiveMessage(messageReceiver.getConsumer());
            messageReceiver.CloseConnection(messageReceiver.getConnection());
        }
    
    }

    发送端:

    package com.sender;
    
    
    import java.io.InputStream;
    import java.util.Properties;
    
    import javax.jms.*;
    
    import org.apache.activemq.*;
    
    public class MessageSender implements IMessageSender {
        
        public ActiveMQConnectionFactory connectionFactory = null;
        public Connection connection = null;
        public Session session = null;
        public Destination destination = null;
        public MessageProducer producer = null;
        
        //初始化,创建factory, connection, session, destination, producer
        public MessageSender(){                
            try {
                InputStream inProperties=MessageSender.class.getResourceAsStream("../config/connection.properties");
                Properties properties = new Properties();
                properties.load(inProperties);
                //创建factory
                connectionFactory = new ActiveMQConnectionFactory(properties.getProperty("name"),
                        properties.getProperty("password"),
                        properties.getProperty("brokerURL"));
                //创建connection
                connection = connectionFactory.createConnection();
                connection.start();
                //获取操作连接
                session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
                //获取消息目的地,需在控制台配置
                destination = session.createQueue(properties.getProperty("queueName"));
                //得到消息发送者
                producer = session.createProducer(destination);
                //设置不持久化
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public TextMessage CreateMessage(Session session, int i) {
            String strMessage = "hello world!   "+i;
            TextMessage message = null;
            try {
                message = session.createTextMessage(strMessage);
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            return message;
        }
    
        public void SendMessage(TextMessage message, MessageProducer producer) {
            try {
                producer.send(message);
                
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
    
        }
        
        public void CloseConnection(Connection connection) {
            if(connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {                
                    e.printStackTrace();
                }
            }
        }
        
        public ConnectionFactory getConnectionFactory() {
            return connectionFactory;
        }
    
        public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
            this.connectionFactory = connectionFactory;
        }
    
        public Connection getConnection() {
            return connection;
        }
    
        public void setConnection(Connection connection) {
            this.connection = connection;
        }
    
        public Session getSession() {
            return session;
        }
    
        public void setSession(Session session) {
            this.session = session;
        }
    
        public Destination getDestination() {
            return destination;
        }
    
        public void setDestination(Destination destination) {
            this.destination = destination;
        }
    
        public MessageProducer getProducer() {
            return producer;
        }
    
        public void setProducer(MessageProducer producer) {
            this.producer = producer;
        }
    }
    package com.sender;
    
    import java.io.FileNotFoundException;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.io.PrintStream;
    
    import javax.jms.JMSException;
    import javax.jms.TextMessage;
    
    public class SendMain {
    
        /**
         * @param args
         * @throws JMSException 
         */
        public static void main(String[] args) throws JMSException {
            MessageSender messageSender = new MessageSender();
            for(int i = 0;i < 10;i++){
                TextMessage textMessage = messageSender.CreateMessage(messageSender.getSession(),i);
                messageSender.SendMessage(textMessage, messageSender.getProducer());
                System.out.println("send message sucess!  :  " + i);
                FileOutputStream out;
                try {
                    out = new FileOutputStream("D:/test.txt");
                    PrintStream p=new PrintStream(out);
                    p.println("send message sucess!  :  " + i);
                    out.close();
                } catch (FileNotFoundException e1) {
                    // TODO Auto-generated catch block
                    e1.printStackTrace();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                
                
                messageSender.getSession().commit();
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            
            messageSender.CloseConnection(messageSender.getConnection());
        }
    
    }
  • 相关阅读:
    pandas--对axis=0,axis=1的理解
    启动secondarynamenode时报错
    5月27日经历问题(在有框架的情况下从无到有增加一套功能)
    5.21工作记录(修改页面跳转,去掉多余的js;增加图片清除功能)
    工作记录520
    5月14日经历问题
    idea快捷键
    Linux下常用redis指令
    初识lunix
    Redis
  • 原文地址:https://www.cnblogs.com/wee616/p/4830542.html
Copyright © 2011-2022 走看看