zoukankan      html  css  js  c++  java
  • 消息队列开发记录笔记-ActiveMQ

    1.下载ActiveMQ

    去官方网站下载:http://activemq.apache.org/

    2.运行ActiveMQ

    解压缩apache-activemq-5.5.1-bin.zip,然后双击apache-activemq-5.5.1inactivemq.bat运行ActiveMQ程序。

    3.代码: 

      需要参数:消息队列IP、端口默认61616,用户名,密码,queue。写在配置文件中。

      消费消息代码,项目启动开启一条线程,线程执行以下代码。一直监听消息队列,有数据就消费。

     1  //连接消息队列
     2      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");   //消息队列的地址
     3      Connection connection = null;
     4      Session session = null;
     5      try {
     6           //根据用户名和密码创建连接消息队列服务器的连接
     7           connection = factory.createConnection("1014", "1014");
     8           connection.start();
     9            //创建客户端确认的会话
    10           session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    11           //确定目标队列
    12       Destination dest = new ActiveMQQueue("skyform.queue.1014");
    13           MessageConsumer consumer = session.createConsumer(dest);
    14           //接受处理消息
    15            while(true){
    16               Message msg = consumer.receive();
    17               if (msg instanceof TextMessage) {
    18                    String body = null;
    19                       try {
    20                         body = ((TextMessage) msg).getText();
    21                             //TODO:接受到消息以后的业务处理,业务需求,接收消息后,放到自己的队列里调用下面的sender.供别人消费。
    22                             msg.acknowledge();//事物处理
    23                       } catch (JMSException e) {
    24                             e.printStackTrace();
    25                       }
    26                }else{
    27                         Throw new RuntimeException(“非法的消息”);
    28                  }
    29                }
    30       } catch (JMSException e) {
    31           e.printStackTrace();
    32       } finally {
    33          if (session != null) {
    34              Try{session.close();}catch(Exception e){e. printStackTrace()}
    35          } 
    36         if (connection != null) {
    37            Try{ connection.close();}catch(Exception e){e. printStackTrace()}
    38         }
    39      }

    发送代码:把消息放到自己的消息队列,供别人消费

     

     1   // ConnectionFactory :连接工厂,JMS 用它创建连接
     2             ConnectionFactory connectionFactory;
     3             // Connection :JMS 客户端到JMS Provider 的连接
     4             Connection connection = null;
     5             // Session: 一个发送或接收消息的线程
     6             Session session;
     7             // Destination :消息的目的地;消息发送给谁.
     8             Destination destination;
     9             // MessageProducer:消息发送者
    10             MessageProducer producer;
    11             // TextMessage message;
    12             // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
    13           
    14             try {
    15                 
    16                 
    17                   connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    18                 
    19                 // 构造从工厂得到连接对象
    20                 connection = connectionFactory.createConnection("1072","1072");
    21                 // 启动
    22                 connection.start();
    23                 // 获取操作连接
    24                 session = connection.createSession(Boolean.TRUE,
    25                         Session.CLIENT_ACKNOWLEDGE);
    26                 // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
    27                 destination = session.createQueue("skyform.queue.1072");
    28                 // 得到消息生成者【发送者】
    29                 producer = session.createProducer(destination);
    30                 // 构造消息,此处写死,项目就是参数,或者方法获取
    31                   sendMessage(session, producer);
    32                 
    33                 
    34                 session.commit();
    35             } catch (Exception e) {
    36                 e.printStackTrace();
    37             } finally {
    38                 try {
    39                     if (null != connection)
    40                         connection.close();
    41                 } catch (Throwable ignore) {
    42                 }
    43             }
    44         }

    4.队列情况查看

    发送队列通过 http://localhost:8161/admin 可监听队列情况以及发送测试数据,如下图:

    5.常见异常

    开发中常见的2个异常以及解决方法:

    javax.jms.IllegalStateException: The Session is closed

    网络异常时客户端会报出这个错误

    javax.jms.JMSException: Channel was inactive for too long

    服务器消息较长时间没有消息发送时,客户端会报这个错误

    可以把连接mq的url修改成下面的样子

    failover:(tcp://127.0.0.1:61616?wireFormat.maxInactivityDuration=10000)&maxReconnectDelay=10000

    failover 失效备援

    maxInactivityDuration 允许最大静止(消息服务器无消息)时间

    maxReconnectDelay 最大重连间隔

    解决方法参考:http://hi.baidu.com/hontlong/blog/item/d475a916ffc8e65df3de32c8.html

     

    6.安全配置

    1、控制台安全配置,打开conf/jetty.xml文件,找到

        <bean id="securityConstraint" class="org.eclipse.jetty.http.security.Constraint">

            <property name="name" value="BASIC" />

            <property name="roles" value="admin" />

            <property name="authenticate" value="false" />

        </bean>

       将“false”改为“true”即可。用户名和密码存放在conf/jetty-realm.properties文件中。

      

     2.连接和消费队列的用户名密码配置

    apache-activemq-5.5.1-binapache-activemq-5.5.1conf

    credentials.properties文件配置用户名、密码

    activemq.username=1091
    activemq.password=aL^PbQInrzUUY^m9

    activemq.xml文件配置:

    <simpleAuthenticationPlugin>

    <users>

    <authenticationUser username="${activemq.username}" password="${activemq.password}" groups="users,admins"/>

    </users>

    </simpleAuthenticationPlugin>

     安全配置参考网站:http://www.cnblogs.com/CopyPaster/archive/2012/04/27/2473179.html

    7.ActiveMQ通过JAAS实现的安全机制

    参考博客《转载-ActiveMQ通过JAAS实现的安全机制》

  • 相关阅读:
    10分钟学会React Context API
    Soft skill
    前端-页面性能调试:Hiper
    js对secure的支持是没问题的,httponly是为限制js而产生的,当然httponly的cookie也不会被js创建
    关于go的不爽
    Windows上mxnet实战深度学习:Neural Net
    wget获取https资源
    使用windows上 mxnet 预编译版本
    NVIDA 提到的 深度框架库
    Windows下编译mxnet
  • 原文地址:https://www.cnblogs.com/e206842/p/5585676.html
Copyright © 2011-2022 走看看