zoukankan      html  css  js  c++  java
  • ActiveMQ主从配置

    这种方式有个问题,activemq1有消息没消费完但是突然宕机,虽然程序会自动连到activemq2。但是activemq1的消息只有等机器恢复后才会被消费。

    1.启动:我这里使用的是apache-activemq-5.13.3,是在windows下使用的,发现根据文档说的双击activemq.bat启动不了,那就只好使用命令启动,CMD进入到apache-activemq-5.13.3in下,输入activemqbat start。这样就可以启动了。

    2.主从配置:第一个activemq解压到apache-activemq-5.13.3,第二个解压到apache-activemq-5.13.3-2

      第一个activemq直接输入命令启动

      第二个需要修改参数:a.打开apache-activemq-5.13.3-2confactivemq.xml,修改broker标签里面的brokerName,不要和第一个相同就行

                b.修改activemq.xml中的transportConnectors,删除其他,只留一个openwire就行,修改uri里面的端口号

                c.在transportConnectors上面添加(如果一会儿启动的时候这里报错,请手动敲打下面三行,不要复制)

                  <networkConnectors>

                    <networkConnector uri="static:(tcp://localhost:61616)" duplex="true"/>
                  </networkConnectors>


                d.修改confjetty.xml文件的115行,端口号随便写一个。(这里是jetty的访问端口)

    配置文件修改完成,启动第一个activemq,启动第二个activemq。

    接下来是代码中brokerURL需要改成使用failover。这样启动生产者和消费者后,程序就可以在主从直接自动切换(可以尝试轮流关闭主从)。

    生产者代码如下:

     1 import javax.jms.Connection;
     2 import javax.jms.ConnectionFactory;
     3 import javax.jms.DeliveryMode;
     4 import javax.jms.Destination;
     5 import javax.jms.MessageProducer;
     6 import javax.jms.Session;
     7 import javax.jms.TextMessage;
     8 
     9 import org.apache.activemq.ActiveMQConnection;
    10 import org.apache.activemq.ActiveMQConnectionFactory;
    11 
    12 public class Sender {
    13     public static void main(String[] args) {
    14         // ConnectionFactory :连接工厂,JMS 用它创建连接
    15         ConnectionFactory connectionFactory;
    16         // Connection :JMS 客户端到JMS Provider 的连接
    17         Connection connection = null;
    18         // Session: 一个发送或接收消息的线程
    19         Session session;
    20         // Destination :消息的目的地;消息发送给谁.
    21         Destination destination;
    22         // MessageProducer:消息发送者
    23         MessageProducer producer;
    24         // TextMessage message;
    25         // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
    26         String brokerURL = "failover://(tcp://localhost:61616,tcp://localhost:61617)";
    27         connectionFactory = new ActiveMQConnectionFactory(
    28                 ActiveMQConnection.DEFAULT_USER,
    29                 ActiveMQConnection.DEFAULT_PASSWORD, brokerURL);
    30         try {
    31             // 构造从工厂得到连接对象
    32             connection = connectionFactory.createConnection();
    33             // 启动
    34             connection.start();
    35             // 获取操作连接
    36             session = connection.createSession(Boolean.TRUE,
    37                     Session.AUTO_ACKNOWLEDGE);
    38             destination = session.createQueue("FirstQueue");
    39             // 得到消息生成者
    40             producer = session.createProducer(destination);
    41                         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    42             while (true) {
    43                 sendMessage(session, producer);
    44                 session.commit();// commit后消息才会发出去
    45                 Thread.sleep(1000);
    46             }
    47         } catch (Exception e) {
    48             e.printStackTrace();
    49         } finally {
    50             try {
    51                 if (null != connection)
    52                     connection.close();
    53             } catch (Throwable ignore) {
    54             }
    55         }
    56     }
    57 
    58     static int i = 1;
    59 
    60     public static void sendMessage(Session session, MessageProducer producer)
    61             throws Exception {
    62         TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + i);
    63         // 发送消息到目的地方
    64         System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
    65         producer.send(message);
    66         i++;
    67     }
    68 }    
    producer

    消费者代码如下:

     1 import javax.jms.Connection;
     2 import javax.jms.ConnectionFactory;
     3 import javax.jms.Destination;
     4 import javax.jms.JMSException;
     5 import javax.jms.Message;
     6 import javax.jms.MessageConsumer;
     7 import javax.jms.MessageListener;
     8 import javax.jms.Session;
     9 import javax.jms.TextMessage;
    10 
    11 import org.apache.activemq.ActiveMQConnection;
    12 import org.apache.activemq.ActiveMQConnectionFactory;
    13 
    14 public class Receiver {
    15     public static void main(String[] args) {
    16         // ConnectionFactory :连接工厂,JMS 用它创建连接
    17         ConnectionFactory connectionFactory;
    18         // Connection :JMS 客户端到JMS Provider 的连接
    19         Connection connection = null;
    20         // Session: 一个发送或接收消息的线程
    21         Session session;
    22         // Destination :消息的目的地;消息发送给谁.
    23         Destination destination;
    24         // 消费者,消息接收者
    25         MessageConsumer consumer;
    26         String brokerURL = "failover://(tcp://localhost:61616,tcp://localhost:61617)";
    27 //        String brokerURL = "tcp://localhost:61616";
    28         connectionFactory = new ActiveMQConnectionFactory(
    29                 ActiveMQConnection.DEFAULT_USER,
    30                 ActiveMQConnection.DEFAULT_PASSWORD,
    31                 brokerURL);
    32         try {
    33             // 构造从工厂得到连接对象
    34             connection = connectionFactory.createConnection();
    35             // 启动
    36             connection.start();
    37             // 获取操作连接
    38             session = connection.createSession(Boolean.FALSE,
    39                     Session.AUTO_ACKNOWLEDGE);
    40             destination = session.createQueue("FirstQueue");
    41             consumer = session.createConsumer(destination);
    42             consumer.setMessageListener(new MyListener());
    43             System.out.println("started...");
    44             while(true){
    45             }
    46         } catch (Exception e) {
    47             e.printStackTrace();
    48         } finally {
    49             try {
    50                 if (null != connection)
    51                     connection.close();
    52             } catch (Throwable ignore) {
    53             }
    54         }
    55     }
    56 }
    57 class MyListener implements MessageListener{
    58     
    59     public void onMessage(Message message) {
    60         TextMessage textMessage = (TextMessage) message;
    61         try {
    62             System.out.println("收到消息:"+textMessage.getText());
    63         } catch (JMSException e) {
    64             e.printStackTrace();
    65         }
    66     }
    67 }
    Receiver

    以上代码部分摘自网络

    这是配置主从的一个方案,还有一种方案是使用文件系统。

  • 相关阅读:
    SpringMVC源码阅读-通过画图理解初始化(十一)
    通过Stratus 服务器在Flash Player中使用RTMFP 开发P2P应用
    Lucene 学习资料
    LIRe 源代码分析 3:基本接口(ImageSearcher)
    LIRe 源代码分析 2:基本接口(DocumentBuilder)
    LIRe 源代码分析 1:整体结构
    Media Player Classic
    ffdshow 源代码分析 5: 位图覆盖滤镜(总结)
    ITU-T Technical Paper: 测量QoS的基本网络模型
    ITU-T Technical Paper: QoS 测量 (目标,方法,协议)
  • 原文地址:https://www.cnblogs.com/qlong8807/p/5485550.html
Copyright © 2011-2022 走看看