ActiveMQ简介
概要
- 开源
- JMS-compliant
- 消息中间件message-oriented middleware(MOM)
- 松耦合,相对于RPC的紧耦合
- 发送消息fire-and-forget
- EDA(event-driven architecture)
- SOA(serive-oriented architecture)
- CEP(complex event processing)
特性
- JMS的实现
- 同步、异步消息delivery
- once-and-only-once消息delivery
- 为订阅者提供的消息durability
- Connectivity
- 支持广泛的协议,包括HTTP/S,IP multicast,SSL,STOMP,TCP,UDP,XMPP,MQTT等等
- transport connectors和network connectors
- Pluggable persistent
- ActiveMQ提供了快速消息持久化KahaDB
- ActiveMQ也支持JDBC数据库持久化
- Pluggable security
- security包含authentication和authorization
- ActiveMQ提供简单的基于properties files认证和授权,以及JAAS登录模块
- JAVA开发(用JMS spec APIs)
- 与应用服务器集成
- 将ActiveMQ与诸如Apache Tomcat、Jetty、Apache Geronimo、JBoss等服务器整合
- 广泛的client APIs
- ActiveMQ的客户端实现可以是C/C++、.NET、Perl、PHP、Python、Ruby等语言
- Broker集群
- 高级broker特性与client
- ActiveMQ支持在broker的xml配置文件里使用Apache Camel
- 非常简单的管理
- 通过基于JMX的JConsole或ActiveMQ web console
- 处理ActiveMQ advisory messages
- 执行命令行脚本
- 监控不同的日志
搭建ActiveMQ
Ant
ActiveMQ
- 官网
- 启动activemq.bat start
- 停止activemq.bat stop
- 控制台网址http://0.0.0.0:8161/admin
- 控制台用户conf/jetty-real.properties
JMS规范
JMS clients
- MessageProducer用于发送消息
Session.createProducer()
指定destinationsend()
可指定destinationclose()
关闭
- MessageConsumer用于消费消息
Session.createConsumer()
只能在此指定destinationreceive()
同步接收sageListener.onMessage()
异步接收
JMS provider
- MOM
JMS message
JMS message分为headers与payload两部分
headers如下表格所列
参数名 | 描述 |
---|---|
JMSDestination | send时自动设置 |
JMSDeliveryMode | 默认persistent deliver,只能once and only once nonpersistent deliver只能at most once delivery mode能够针对消息独立设置 |
JMSExpiration | 可通过MessageProducer.setTimeToLive() 或全局设置或 MessageProducer.send() 独立设置值为0永不过期 |
JMSMessageID | 用于区别provider分发的消息 可通过 MessageProducer.setDisableMessageID() 建议Provider置JMSMessageID为null |
JMSPriority | 消息的重要性,从0到9逐级增高 0~4属于normal 5~9属于expedited JMS Provider通常会优先分发高priority的消息 |
JMSTimestamp | provider收到消息的时间 可通过 MessageProducer.setDisableMessageTimestamp() 建议Provider置JMSTimestamp为0 |
JMSCorrelationID | 关联当前消息与上一消息,通常用于关联请求与接收 前缀标准为ID: |
JMSReplyTo | 指定回复送达的destination |
JMSType | 指定消息类型,非payload类型,鲜用 |
JMSRedelivered | 表示当前消息是重发之前未被确认的消息 |
properties:属于headers,存储JAVA原生数据类型。含有如下两种:
custom properties:自定义属性
jms-defined properties:以JMSX为前缀的可选属性
- JMSXAppID:标志发送消息的应用
- JMSXConsumerTXID:被消费消息的事务标识
- JMSXDeliveryCount:消息分发尝试次数
- JMSXGroupID:标识消息组
- JMSXGroupSeq:消息在组中的序列号
- JMSXProducerTXID:被生产的消息的事务标识
- JMSXRcvTimestamp:provider分发消息给consumer的时间
- JMSXState:与provider相关的时间
- JMSXUserID:标识发送消息的用户
- provider-specific properties:JMS_<vendor-name>的provider特定属性,只应使用于non-JMS clients
Message Selectors
- 根据headers与properties筛选consumer预期接收的消息。
- 基于SQL92子集的条件表达式,表达式为真表示符合预期
- 不能关联payload
- 示例
String selector = "SYMBOL IN ('AAPL', 'CSCO') AND PRICE > " + getPreviousPrice() + " AND PE_RATIO < " + getCurrentAcceptedPriceToEarningsRatioThreshold();
MessageConsumer consumer = session.createConsumer(destination, selector);
- payload类型包含
- Message:用于不含payload的消息
- TextMessage:含String的消息
- MapMessage:键值对消息,键是String,值是Java基本类型
- BytesMessage:含byte数组的消息
- StreamMessage:Java基本类型的流,支持串行化读写
- ObjectMessage:含序列号的Java对象,包括集合
JMS domains
- point-to-point
- destination是queue
- 可同步可异步
- once-and-only-once消息delivery
- 单个consumer接收,多个consumer注册也仅有单个consumer接收并确认
- publish/subscribe
- destination是topic
- 可同步可异步
- topic默认不存储消息
- 使用durable subscription可以让provider在consumer离线时保存所有未能发送的消息,在consumer上线发送所有存储的消息
- 多个consumer接收
- durability
- durability仅用于pub/sub domain
- durable subscription是infinite,nondurable subscription是finite
- durable subscription只有在consumer显式离线或消息过期时,provider才会扔掉存储的消息
- persistence
- 与domain无关
- 受message producer的
setDeliveryMode
方法控制
- request/reply
- JMSReplyTo指定reply发送的destination
- JMSCorrelationID指定了request的JMSMessageID
- QueueRequestor与TopicRequestor提供了单请求-单回复的实现,单请求-多回复需要自行实现
Administered objects
- ConnectionFactory
- 一个connection通常代表一个TCP socket
- connection被client用于创建
javax.jms.Session
- Destination
Session
创建,生命周期等同于Connection
Temporary destinations
对于Connection
是独一无二的