消息队列(Message Queue,简称 MQ)是阿里巴巴集团中间件技术部自主研发的专业消息中间件。产品基于高可用分布式集群技术,提供消息发布订阅、消息轨迹查询、定时(延时)消息、资源统计、监控报警等一系列消息云服务,是企业级互联网架构的核心产品。MQ 历史超过9年,为分布式应用系统提供异步解耦、削峰填谷的能力,同时具备海量消息堆积、高吞吐、可靠重试等互联网应用所需的特性,是阿里巴巴双11使用的核心产品。
MQ 是阿里云正式商用的产品,目前在阿里云多个地域(Region)提供了高可用消息云服务,单个域内采用多机房部署,可用性极高,即使整个机房都不可用,仍然可以为应用提供消息发布服务,产品稳定性及可用性完全按照阿里巴巴内部标准来实施,无单点。
MQ 目前提供 TCP、HTTP、MQTT 三种协议层面的接入方式,支持 Java、C++ 以及 .NET 不同语言,方便不同编程语言开发的应用快速接入 MQ 消息云服务。用户可以将应用部署在阿里云 ECS、企业自建云,或者嵌入到移动端、物联网设备中与 MQ 建立连接进行消息收发,同时本地开发者也可以通过公网接入 MQ 服务进行消息收发。
参考阿里云消息队列:
https://help.aliyun.com/document_detail/29537.html?spm=5176.product29530.6.548.BaK6km
https://ons.console.aliyun.com/?spm=5176.6660585.774526198.1.3avQ5c#/home/pub
1,Maven 方式引入依赖
<dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.2.1</version> </dependency>
2,发送消息
主动触发消息发送;先进行参数初始化,然后再调用消息发送
public class ProducerTest { public static void main(String[] args) { Properties properties = new Properties(); properties.put(PropertyKeyConst.ProducerId, "XXX");// 您在MQ控制台创建的Producer ID properties.put(PropertyKeyConst.AccessKey,"XXX");// 鉴权用AccessKey,在阿里云服务器管理控制台创建 properties.put(PropertyKeyConst.SecretKey, "XXX");// 鉴权用SecretKey,在阿里云服务器管理控制台创建 Producer producer = ONSFactory.createProducer(properties); // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可 producer.start(); //循环发送消息 while(true){ Message msg = new Message( // // Message Topic "TopicTestMQ", // Message Tag, // 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤 "TagA", // Message Body // 任何二进制形式的数据, MQ不做任何干预, // 需要Producer与Consumer协商好一致的序列化和反序列化方式 "Hello MQ".getBytes()); // 设置代表消息的业务关键属性,请尽可能全局唯一,以方便您在无法正常收到消息情况下,可通过MQ控制台查询消息并补发 // 注意:不设置也不会影响消息正常收发 msg.setKey("ORDERID_100"); // 发送消息,只要不抛异常就是成功 // 打印Message ID,以便用于消息发送状态查询 SendResult sendResult = producer.send(msg); System.out.println("Send Message success. Message ID is: " + sendResult.getMessageId()); } // 在应用退出前,可以销毁Producer对象 // 注意:如果不销毁也没有问题 producer.shutdown(); } }
3,接受消息
该部分有名订阅消息; 将订阅消息Service使用Spring上下文进行初始化后,订阅消息功能将处于监听状态,消息发送后将自动被接收,并触发其他业务功能
public class ConsumerTest { public static void main(String[] args) { Properties properties = new Properties(); properties.put(PropertyKeyConst.ConsumerId, "XXX");// 您在MQ控制台创建的Consumer ID properties.put(PropertyKeyConst.AccessKey, "XXX");// 鉴权用AccessKey,在阿里云服务器管理控制台创建 properties.put(PropertyKeyConst.SecretKey, "XXX");// 鉴权用SecretKey,在阿里云服务器管理控制台创建 Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("TopicTestMQ", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println("Receive: " + message); return Action.CommitMessage; } }); consumer.start(); System.out.println("Consumer Started"); } }