zoukankan      html  css  js  c++  java
  • 阿里云 消息队列mq

    使用阿里云消息队列

    控制台地址:http://ons.console.aliyun.com/#/home/topic

    Demo:

    支付消息mq工厂类:

    public class DfacePayConsumerFactory {

    public static String CID = "CID-";
      
      //监听执行实例
    @Autowired
    private DfacePayConsumerListener dfacePayConsumerListener;

    private String topic;
    private String pTag;
    private String accessKey;
    private String secretKey;
    private String tag;

    private Consumer consumer;

    /**
    * @return the topic
    */
    public String getTopic() {
    return topic;
    }

    /**
    * @param topic the topic to set
    */
    public void setTopic(String topic) {
    this.topic = topic;
    }

    /**
    * @return the tag
    */
    public String getTag() {
    return tag;
    }

    /**
    * @param tag the tag to set
    */
    public void setTag(String tag) {
    this.tag = tag;
    }

    /**
    * @return the pTag
    */
    public String getpTag() {
    return pTag;
    }

    /**
    * @param pTag the pTag to set
    */
    public void setpTag(String pTag) {
    this.pTag = pTag;
    }

    /**
    * @return the accessKey
    */
    public String getAccessKey() {
    return accessKey;
    }

    /**
    * @param accessKey the accessKey to set
    */
    public void setAccessKey(String accessKey) {
    this.accessKey = accessKey;
    }

    /**
    * @return the secretKey
    */
    public String getSecretKey() {
    return secretKey;
    }

    /**
    * @param secretKey the secretKey to set
    */
    public void setSecretKey(String secretKey) {
    this.secretKey = secretKey;
    }

    public void initConsumer() {

    Properties properties = new Properties();
    String consumerId = CID + this.topic
    + (StringUtils.hasText(this.pTag) ? "-" + this.pTag : "");
    properties.put(PropertyKeyConst.ConsumerId, consumerId);
    properties.put(PropertyKeyConst.AccessKey, this.accessKey);
    properties.put(PropertyKeyConst.SecretKey, this.secretKey);

         /**
         ( 
         //相关属性介绍:
         //Properties properties = new Properties();
         //properties.put(PropertyKeyConst.ConsumerId, consumerLocal.getConsumerId());
         // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
         //properties.put(PropertyKeyConst.AccessKey, consumerLocal.getAccessKey());
         // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
         //properties.put(PropertyKeyConst.SecretKey, consumerLocal.getSecreKey());
         //消息处理失败后多久重新发送消息
         properties.put(PropertyKeyConst.SuspendTimeMillis, consumerLocal.getSuspendTimeMillis());
         //重发的次数
         //properties.put(PropertyKeyConst.MaxReconsumeTimes, consumerLocal.getMaxReconsumeTimes());
         //消费者的线程数
         //properties.put(PropertyKeyConst.ConsumeThreadNums,"1");
         //消费者的介入地址
         //properties.put(PropertyKeyConst.ONSAddr, consumerLocal.getOnsAddress());
         )

         **/
     
    consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe(topic, tag, this.dfacePayConsumerListener);
    new Thread(new Runnable() {

    @Override
    public void run() {
    try {
    Thread.sleep(90000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    consumer.start();
    }
    }).start();

    Runtime.getRuntime().addShutdownHook(new Thread() {
    @Override
    public void run() {
    shutDown();
    }
    });
    }

    /**
    * 停止监听
    *
    * @return
    */
    public boolean shutDown() {
    if (null != this.consumer) {
    this.consumer.shutdown();
    return true;
    }
    return false;
    }
    }

    //dPay支付监听  执行mq consume消息接收(通过topic订阅)
    @Component("dfacePayConsumerListener")
    public class DfacePayConsumerListener implements MessageListener {
    private static Logger logger = LoggerFactory.getLogger(DfacePayConsumerListener.class);

    @Autowired
    private ...;

    @Override
    public Action consume(Message message, ConsumeContext context) {
    String msg = new String(message.getBody());
    String tag = message.getTag();
    logger.info(LogUtils.builder().append("mq", "接受mq").append("mqTag", tag)
    .append("mqMsg", msg).toString());
    return tagHandle(tag, msg, message);
    }

    public Action tagHandle(String tag, String msg, Message message) {
    if (MqTagEnum.PAY.name().equals(tag)) {
    try {
    PayBackBo payBackBo = JSON.parse(msg, PayBackBo.class);
    //检查订单号
    if (!payBackBo.getOrderNo().startsWith(ApplicationConstant.APP_NO)) {
    logger.info(LogUtils.format("订单支付失败: orderNo 前缀 ", payBackBo.getOrderNo()));
    return Action.CommitMessage;
    }
    return giftOrderHandle(payBackBo);
    } catch (Exception e) {
    e.printStackTrace();
    logger.info(LogUtils.format("paid_error", e.getMessage()));
    return Action.ReconsumeLater;
    }
    } else {
    logger.info(LogUtils.builder().append("tag error", tag).append("msg", msg)
    .append("message", message).toString());
    }
    return Action.CommitMessage;
    }

    /**
    * 支付订单处理
    *
    * @param payBackBo
    * @return
    */
    private Action giftOrderHandle(PayBackBo payBackBo) {
    //处理支付业务逻辑
    }


    }
  • 相关阅读:
    ISCC 2018——write up
    图的存储结构(十字链表、邻接多重表、边集数组)
    图的存储结构
    树梅派(Raspberry Pi 3b)安装kali linux 2.0
    树梅派3B kali2.0 启用SSH进行远程登录
    VS+VAssistX自动添加注释
    libtiff库使用
    word采用尾注进行参考文献排版的一些问题
    vs2008安装opencv2.4.6
    Altera CYCLONE III FPGA BGA布线
  • 原文地址:https://www.cnblogs.com/yzf666/p/9681271.html
Copyright © 2011-2022 走看看