zoukankan      html  css  js  c++  java
  • activemq-5.13 在windows下部署应用

    一、下载windows压缩包

        解压并双击F:serveractivemq-5.13.2inwin64activemq.bat 启动,32位的系统为F:serveractivemq-5.13.2inwin32activemq.bat;

    二、配置访问权限

        编辑F:serveractivemq-5.13.2confactivemq.xml 文件,

        添加plugins节点:

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
    
         <plugins>  
         <simpleAuthenticationPlugin>         
          <users>             
            <authenticationUser username="system" password="123" groups="users,admins"/>             
            <authenticationUser username="user" password="321" groups="users"/>
            <authenticationUser username="guest" password="111" groups="guests"/>
          </users>
         </simpleAuthenticationPlugin>
        </plugins>
    
            <destinationPolicy>
                <policyMap>
                  <policyEntries>
                    <policyEntry topic=">" >
                        <!-- The constantPendingMessageLimitStrategy is used to prevent
                             slow topic consumers to block producers and affect other consumers
                             by limiting the number of messages that are retained
                             For more information, see:
    
                             http://activemq.apache.org/slow-consumer-handling.html
    
                        -->
                      <pendingMessageLimitStrategy>
                        <constantPendingMessageLimitStrategy limit="1000"/>
                      </pendingMessageLimitStrategy>
                    </policyEntry>
                  </policyEntries>
                </policyMap>
            </destinationPolicy>
    
    
            <!--
                The managementContext is used to configure how ActiveMQ is exposed in
                JMX. By default, ActiveMQ uses the MBean server that is started by
                the JVM. For more information, see:
    
                http://activemq.apache.org/jmx.html
            -->
            <managementContext>
                <managementContext createConnector="false"/>
            </managementContext>
    
            <!--
                Configure message persistence for the broker. The default persistence
                mechanism is the KahaDB store (identified by the kahaDB tag).
                For more information, see:
    
                http://activemq.apache.org/persistence.html
            -->
            <persistenceAdapter>
                <kahaDB directory="${activemq.data}/kahadb"/>
            </persistenceAdapter>
    
    
              <!--
                The systemUsage controls the maximum amount of space the broker will
                use before disabling caching and/or slowing down producers. For more information, see:
                http://activemq.apache.org/producer-flow-control.html
              -->
              <systemUsage>
                <systemUsage>
                    <memoryUsage>
                        <memoryUsage percentOfJvmHeap="70" />
                    </memoryUsage>
                    <storeUsage>
                        <storeUsage limit="100 gb"/>
                    </storeUsage>
                    <tempUsage>
                        <tempUsage limit="50 gb"/>
                    </tempUsage>
                </systemUsage>
            </systemUsage>
    
            <!--
                The transport connectors expose ActiveMQ over a given protocol to
                clients and other brokers. For more information, see:
    
                http://activemq.apache.org/configuring-transports.html
            -->
            <transportConnectors>
                <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
                <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            </transportConnectors>
    
            <!-- destroy the spring context on shutdown to stop jetty -->
            <shutdownHooks>
                <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
            </shutdownHooks>
    
        </broker>

     三、访问http://localhost:8161/控制台;

     四、示例代码:

    package activemq;
    
    import java.io.Serializable;
    
    public class User implements Serializable {
    
        private static final long serialVersionUID = 1L;
        private int id;
        private String name;
        private String pass;
        
        public User(int id, String name, String pass){
            this.id = id;
            this.name = name;
            this.pass = pass;
        }
    
        public int getId() {
            return id;
        }
    
        public void setId(int id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public String getPass() {
            return pass;
        }
    
        public void setPass(String pass) {
            this.pass = pass;
        }
        
        
    }
    package activemq;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.ObjectMessage;
    import javax.jms.Session;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
     
    public class JmsSender {
    
        public void send() {
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    "tcp://127.0.0.1:61616");
    
            try {
                Connection connection = connectionFactory.createConnection("user", "321");
                connection.start();
    
                Session session = connection.createSession(Boolean.TRUE,
                        Session.AUTO_ACKNOWLEDGE);
                Destination destination = session.createQueue("Test4.foo");
    
                MessageProducer producer = session.createProducer(destination);
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                for (int i = 0; i < 100; i++) {
                    int id = i + 1;
                    ObjectMessage message = session.createObjectMessage();
                    message.setObject(new User(id, "李四" + id, "123456"));
                    
                    /**
                    //文本消息     
                    TextMessage textMessage = session.createTextMessage("文本消息");   
    
                    //键值对消息     
                    MapMessage mapMessage = session.createMapMessage();     
                    mapMessage.setLong("age", new Long(32));     
                    mapMessage.setDouble("sarray", new Double(5867.15));     
                    mapMessage.setString("username", "键值对消息");   
                    
                    //流消息     
                    StreamMessage streamMessage = session.createStreamMessage();     
                    streamMessage.writeString("streamMessage流消息");     
                    streamMessage.writeLong(55); 
    
                    //字节消息     
                    String s = "BytesMessage字节消息";     
                    BytesMessage bytesMessage = session.createBytesMessage();     
                    bytesMessage.writeBytes(s.getBytes());  
                    **/
                    
                    producer.send(message);
                }
                session.commit();
                session.close();
                connection.close();
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        
        public static void main(String[] args){
            new JmsSender().send();
        }
    }
    package activemq;
    
    import java.util.concurrent.TimeUnit;
    
    import javax.jms.BytesMessage;
    import javax.jms.Connection;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MapMessage;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.ObjectMessage;
    import javax.jms.Session;
    import javax.jms.StreamMessage;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class JmsReceiver {
    
        public static void main(String[] args) {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    "tcp://127.0.0.1:61616");
            connectionFactory.setTrustAllPackages(true);
    
            try {
                Connection connection = connectionFactory.createConnection("guest",
                        "111");
                connection.start();
    
                final Session session = connection.createSession(Boolean.TRUE,
                        Session.AUTO_ACKNOWLEDGE);
                Destination destination = session.createQueue("Test4.foo");
    
                MessageConsumer consumer = session.createConsumer(destination);
                // listener 方式
                consumer.setMessageListener(new MessageListener() {
    
                    public void onMessage(Message msg) {
                        ObjectMessage message = (ObjectMessage) msg;
                        // TODO something....
                        try {
                            User user = (User) message.getObject();
                            System.out.println("收到消息:" + user.getName());
                        } catch (JMSException e1) {
                            // TODO Auto-generated catch block
                            e1.printStackTrace();
                        }
                        try {
                            session.commit();
                        } catch (JMSException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
    
                });
                TimeUnit.MINUTES.sleep(1);
    
                session.close();
                connection.close();
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
        public void onMessage(Message m) {
            try {
                if (m instanceof TextMessage) { // 接收文本消息
                    TextMessage message = (TextMessage) m;
                    System.out.println(message.getText());
                } else if (m instanceof MapMessage) { // 接收键值对消息
                    MapMessage message = (MapMessage) m;
                    System.out.println(message.getLong("age"));
                    System.out.println(message.getDouble("sarray"));
                    System.out.println(message.getString("username"));
                } else if (m instanceof StreamMessage) { // 接收流消息
                    StreamMessage message = (StreamMessage) m;
                    System.out.println(message.readString());
                    System.out.println(message.readLong());
                } else if (m instanceof BytesMessage) { // 接收字节消息
                    byte[] b = new byte[1024];
                    int len = -1;
                    BytesMessage message = (BytesMessage) m;
                    while ((len = message.readBytes(b)) != -1) {
                        System.out.println(new String(b, 0, len));
                    }
                } else if (m instanceof ObjectMessage) { // 接收对象消息
                    ObjectMessage message = (ObjectMessage) m;
                    User user = (User) message.getObject();
                } else {
                    System.out.println(m);
                }
     
            } catch (JMSException e) {
     
                e.printStackTrace();
            }
        }
    
    }

        

  • 相关阅读:
    关于Shipping
    怎么Debug Background Job [转载sdn]
    如何显示IDoc的每个segment/field的具体说明
    underscore.js
    Javascript Style Guide
    [转] Ajax_XMLHttpRequest对象详解
    ExecutingMethodsFromLinkButtonParameters
    使用C#操作MSExcel表格COM
    [转]HttpContext, HttpModules 和 HttpHandlers
    几个实用的Servlet应用例子入门、cookie、session及上传文件
  • 原文地址:https://www.cnblogs.com/101key/p/5282939.html
Copyright © 2011-2022 走看看