zoukankan      html  css  js  c++  java
  • RabbitMQ 延迟队列实现订单支付结果异步阶梯性通知

    在第三方支付中,例如支付宝、或者微信,对于订单请求,第三方支付系统采用的是消息同步返回、异步通知+主动补偿查询的补偿机制。

    由于互联网通信的不可靠性,例如双方网络、服务器、应用等因素的影响,不管是同步返回、异步通知、主动查询报文都可能出现超时无响应、报文丢失等情况,所以像支付业务,对结果的通知一般采用几种方案结合的补偿机制,不能完全依赖某一种机制。
    例如一个支付结果的通知,一方面会在支付页面跳转时候返回支付结果(一般只用作前端展示使用,非最终状态),同时会采用后台异步通知机制(有前台、后台通知的,以后台异步通知结果为准),但由于前台跳转、后台结果通知都可能失效,因此还以定时补单+请求方主动查询接口作为辅助手段。
    常见的补单操作,任务调度策略一般设定30秒、60秒、3分钟、6分钟、10分钟调度多次(以自己业务需要),如果调度接收到响应确认报文,补单成功,则中止对应订单的调度任务;如果超过补单上限次数,则停止补单,避免无谓的资源浪费。请求端随时可以发起请求报文查询对应订单的状态。
    在日常开发中,对于网站前端来说,支付计费中心对于订单请求信息的处理也是通过消息同步返回、异步通知+主动补偿查询相结合的机制,其中对于订单的异步通知,目前的通知策略为3s、30s、60s、120s、180、300s的阶梯性通知。返回成功情况下就不继续通知了,本来打算使用将失败的消息写到数据库等待发送,然后每秒查询数据库获取消息通知前端。但觉得这样的处理方式太粗暴。存在以下缺点:
    1 、每秒请求有点儿浪费资源; 2 、通知方式不稳定; 3 、无法承受大数据量等等
    所以最终打算使用rabbitmq的消息延迟+死信队列来实现。消息模型如下:

    producer发布消息,通过exchangeA的消息会被分发到QueueA,Consumer监听queueA,一旦有消息到来就被消费,这边的消费业务就是通知前端,如果通知失败,就创建一个延迟队列declareQueue,设置每个消息的ttl然后通过declare_exchange将消息分发到declare_queue,因为declare_queue没有consumer并且declare_queue中的消息设置了ttl,当ttl到期后,将通过DEX路由到queueA,被重新消费。
    代码如下:DeclareQueue.java
    1. package org.delayQueue;
    2. import com.rabbitmq.client.BuiltinExchangeType;
    3. import com.rabbitmq.client.Channel;
    4. import com.rabbitmq.client.Connection;
    5. import com.rabbitmq.client.ConnectionFactory;
    6. public class DeclareQueue {
    7. public static String EXCHANGE_NAME = "notifyExchange";
    8. public static void init() {
    9. ConnectionFactory factory = new ConnectionFactory();
    10. factory.setHost("localhost");
    11. factory.setPort(5672);
    12. Connection connection = null;
    13. try {
    14. connection = factory.newConnection();
    15. Channel channel = connection.createChannel();
    16. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
    17. String routingKey = "AliPaynotify";
    18. String message = "http://localhost:8080/BossCenter/payGateway/notifyRecv.jsp?is_success=T¬ify_id=4ab9bed148d043d0bf75460706f7774a¬ify_time=2014-08-29+16%3A22%3A02¬ify_type=trade_status_sync&out_trade_no=1421712120109862&total_fee=424.42&trade_no=14217121201098611&trade_status=TRADE_SUCCESS";
    19. channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
    20. System.out.println(" [x] Sent :" + message);
    21. } catch (Exception e) {
    22. // TODO Auto-generated catch block
    23. e.printStackTrace();
    24. } finally {
    25. if (connection != null) {
    26. try {
    27. connection.close();
    28. } catch (Exception ignore) {
    29. }
    30. }
    31. }
    32. }
    33. public static void main(String args[]) {
    34. init();
    35. }
    36. }

    DeclareConsumer.java
    1. package org.delayQueue;
    2. import java.io.BufferedReader;
    3. import java.io.IOException;
    4. import java.io.InputStreamReader;
    5. import java.util.ArrayList;
    6. import java.util.HashMap;
    7. import java.util.List;
    8. import java.util.Map;
    9. import java.util.Map.Entry;
    10. import org.apache.http.HttpResponse;
    11. import org.apache.http.client.ClientProtocolException;
    12. import org.apache.http.client.HttpClient;
    13. import org.apache.http.client.methods.HttpPost;
    14. import org.apache.http.impl.client.DefaultHttpClient;
    15. import com.rabbitmq.client.AMQP;
    16. import com.rabbitmq.client.Channel;
    17. import com.rabbitmq.client.Connection;
    18. import com.rabbitmq.client.ConnectionFactory;
    19. import com.rabbitmq.client.Consumer;
    20. import com.rabbitmq.client.DefaultConsumer;
    21. import com.rabbitmq.client.Envelope;
    22. public class DeclareConsumer {
    23. public static String EXCHANGE_NAME = "notifyExchange";
    24. public static String QU_declare_15S = "Qu_declare_15s";
    25. public static String EX_declare_15S = "EX_declare_15s";
    26. public static String ROUTINGKEY = "AliPaynotify";
    27. public static Connection connection = null;
    28. public static Channel channel = null;
    29. public static Channel DECLARE_15S_CHANNEL = null;
    30. public static String declare_queue = "init";
    31. public static String originalExpiration = "0";
    32. public static void init() throws Exception {
    33. ConnectionFactory factory = new ConnectionFactory();
    34. factory.setHost("localhost");
    35. factory.setPort(5672);
    36. connection = factory.newConnection();
    37. channel = connection.createChannel();
    38. DECLARE_15S_CHANNEL = connection.createChannel();
    39. }
    40. public static void consume() {
    41. try {
    42. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    43. final String queueName = channel.queueDeclare().getQueue();
    44. channel.queueBind(queueName, EXCHANGE_NAME, ROUTINGKEY);
    45. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    46. final Consumer consumer = new DefaultConsumer(channel) {
    47. @Override
    48. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    49. String message = new String(body, "UTF-8");
    50. Map<String, Object> headers = properties.getHeaders();
    51. if (headers != null) {
    52. List<Map<String, Object>> xDeath = (List<Map<String, Object>>) headers.get("x-death");
    53. System.out.println("xDeath--- > " + xDeath);
    54. if (xDeath != null && !xDeath.isEmpty()) {
    55. Map<String, Object> entrys = xDeath.get(0);
    56. // for(Entry<String, Object>
    57. // entry:entrys.entrySet()){
    58. // System.out.println(entry.getKey()+":"+entry.getValue());
    59. // }
    60. originalExpiration = entrys.get("original-expiration").toString();
    61. }
    62. }
    63. System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'" + "time" + System.currentTimeMillis());
    64. HttpClient httpClient = new DefaultHttpClient();
    65. HttpPost post = new HttpPost(message);
    66. HttpResponse response = httpClient.execute(post);
    67. BufferedReader inreader = null;
    68. if (response.getStatusLine().getStatusCode() == 200) {
    69. inreader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), "UTF-8"));
    70. StringBuffer responseBody = new StringBuffer();
    71. String line = null;
    72. while ((line = inreader.readLine()) != null) {
    73. responseBody.append(line);
    74. }
    75. if (!responseBody.equals("success")) {
    76. // putDeclre15s(message);
    77. if (originalExpiration.equals("0")) {
    78. putDeclreQueue(message, 3000, QU_declare_15S);
    79. }
    80. if (originalExpiration.equals("3000")) {
    81. putDeclreQueue(message, 30000, QU_declare_15S);
    82. }
    83. if (originalExpiration.equals("30000")) {
    84. putDeclreQueue(message, 60000, QU_declare_15S);
    85. }
    86. if (originalExpiration.equals("60000")) {
    87. putDeclreQueue(message, 120000, QU_declare_15S);
    88. }
    89. if (originalExpiration.equals("120000")) {
    90. putDeclreQueue(message, 180000, QU_declare_15S);
    91. }
    92. if (originalExpiration.equals("180000")) {
    93. putDeclreQueue(message, 300000, QU_declare_15S);
    94. }
    95. if (originalExpiration.equals("300000")) {
    96. // channel.basicConsume(QU_declare_300S,true, this);
    97. System.out.println("finish notify");
    98. }
    99. }
    100. } else {
    101. System.out.println(response.getStatusLine().getStatusCode());
    102. }
    103. }
    104. };
    105. channel.basicConsume(queueName, true, consumer);
    106. } catch (Exception e) {
    107. e.printStackTrace();
    108. } finally {
    109. }
    110. }
    111. static Map<String, Object> xdeathMap = new HashMap<String, Object>();
    112. static List<Map<String, Object>> xDeath = new ArrayList<Map<String, Object>>();
    113. static Map<String, Object> xdeathParam = new HashMap<String, Object>();
    114. public static void putDeclre15s(String message) throws IOException {
    115. channel.exchangeDeclare(EX_declare_15S, "topic");
    116. Map<String, Object> args = new HashMap<String, Object>();
    117. args.put("x-dead-letter-exchange", EXCHANGE_NAME);// 死信exchange
    118. AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
    119. builder.expiration("3000").deliveryMode(2);// 设置消息TTL
    120. AMQP.BasicProperties properties = builder.build();
    121. channel.queueDeclare(QU_declare_15S, false, false, false, args);
    122. channel.queueBind(QU_declare_15S, EX_declare_15S, ROUTINGKEY);
    123. channel.basicPublish(EX_declare_15S, ROUTINGKEY, properties, message.getBytes());
    124. System.out.println("send message in QA_DEFERRED_15S" + message + "time" + System.currentTimeMillis());
    125. }
    126. public static void putDeclreQueue(String message, int mis, String queue) throws IOException {
    127. channel.exchangeDeclare(EX_declare_15S, "topic");
    128. Map<String, Object> args = new HashMap<String, Object>();
    129. args.put("x-dead-letter-exchange", EXCHANGE_NAME);// 死信exchange
    130. AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
    131. builder.expiration(String.valueOf(mis)).deliveryMode(2);// 设置消息TTL
    132. AMQP.BasicProperties properties = builder.build();
    133. channel.queueDeclare(queue, false, false, false, args);
    134. channel.queueBind(queue, EX_declare_15S, ROUTINGKEY);
    135. channel.basicPublish(EX_declare_15S, ROUTINGKEY, properties, message.getBytes());
    136. System.out.println("send message in " + queue + message + "time============" + System.currentTimeMillis());
    137. }
    138. public static void main(String args[]) throws Exception {
    139. init();
    140. consume();
    141. }
    142. }
    消息通过dlx转发的情况下,header头部会带有x-death的一个数组,里面包含消息的各项属性,比如说消息成为死信的原因reason,original-expiration这个字段表示消息在原来队列中的过期时间,根据这个值来确定下一次通知的延迟时间应该是多少秒。
    运行结果如下:




  • 相关阅读:
    164 Maximum Gap 最大间距
    162 Find Peak Element 寻找峰值
    160 Intersection of Two Linked Lists 相交链表
    155 Min Stack 最小栈
    154 Find Minimum in Rotated Sorted Array II
    153 Find Minimum in Rotated Sorted Array 旋转数组的最小值
    152 Maximum Product Subarray 乘积最大子序列
    151 Reverse Words in a String 翻转字符串里的单词
    bzoj3994: [SDOI2015]约数个数和
    bzoj 4590: [Shoi2015]自动刷题机
  • 原文地址:https://www.cnblogs.com/jpfss/p/9908853.html
Copyright © 2011-2022 走看看