zoukankan      html  css  js  c++  java
  • 阿里云rocketmq用法

    package com.sunxing.service.merchants.config;

    import com.aliyun.openservices.ons.api.Consumer;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import com.sunxing.service.merchants.listener.RocketConsumerListener;
    import lombok.Getter;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import javax.annotation.PostConstruct;
    import java.util.Properties;

    /**
    * rocketMQ消费者配置
    * @author vincen
    */
    @Configuration
    @Getter
    @Slf4j
    public class ConsumerConfig {

    @Value("${workorder.rocketmq.name-server}")
    private String nameServer;

    @Value("${workorder.rocketmq.consumer.access-key}")
    private String accessKey;

    @Value("${workorder.rocketmq.consumer.secret-key}")
    private String secretKey;

    @Value("${workorder.rocketmq.consumer.topic}")
    private String topic;

    @Value("${workorder.rocketmq.consumer.tag}")
    private String tag;

    @Value("${workorder.rocketmq.consumer.group-id}")
    private String groupId;

    @Value("${workorder.rocketmq.consumer.timeout-mills}")
    private String timeoutMills;


    /**
    *
    * @return
    */
    @Bean
    public RocketConsumerListener rocketConsumerListener() {
    return new RocketConsumerListener();
    }

    @PostConstruct
    public void init(){
    Properties props = new Properties();

    props.put(PropertyKeyConst.NAMESRV_ADDR, nameServer);
    props.put(PropertyKeyConst.GROUP_ID, groupId);
    props.put(PropertyKeyConst.AccessKey, accessKey);
    props.put(PropertyKeyConst.SecretKey, secretKey);
    props.put(PropertyKeyConst.MaxReconsumeTimes, 3);
    props.put(PropertyKeyConst.SendMsgTimeoutMillis, timeoutMills);

    Consumer consumer = ONSFactory.createConsumer(props);
    consumer.subscribe(topic, tag, rocketConsumerListener());

    consumer.start();

    log.info("--- RocketMq Consumer Runner Started OK.");
    }
    }


    package com.sunxing.service.merchants.listener;
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.TypeReference;
    import com.aliyun.openservices.ons.api.Action;
    import com.aliyun.openservices.ons.api.ConsumeContext;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.MessageListener;
    import com.sunxing.service.merchants.constant.SceneType;
    import com.sunxing.service.merchants.service.MerchantsPlatMerchantService;
    import lombok.extern.slf4j.Slf4j;
    import net.sunxing.workorder.data.dto.response.mq.MsgDto;
    import net.sunxing.workorder.data.dto.response.mq.NotifyMsgRepsDto;
    import net.sunxing.workorder.data.dto.response.operator.OperatorDto;
    import net.sunxing.workorder.data.dto.response.task.TaskDto;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;

    import static com.sunxing.service.merchants.constant.CommonConstants.OPEN_WORK_ORDER;

    /**
    * rocketMq消费监听器
    *
    * @author vincen
    */
    @Slf4j
    public class RocketConsumerListener implements MessageListener {

    /**
    *
    */
    private static final Logger logger = LoggerFactory.getLogger(RocketConsumerListener.class);




    /**
    * 消费消息
    * @param message
    * @param consumeContext
    * @return
    */
    @Override
    public Action consume(Message message, ConsumeContext consumeContext) {
    try {

    messagePrinter(message);
    //MsgDto<NotifyMsgRepsDto> result = JsonUtils.json2Object(new String(message.getBody()), MsgDto.class);
    MsgDto<NotifyMsgRepsDto> result = JSON.parseObject(new String(message.getBody()),
    new TypeReference<MsgDto<NotifyMsgRepsDto>>(){});

    if (result!=null){


    }else
    {

    }
    } catch (Exception e) {

    // if (message.getReconsumeTimes() >= 3) {
    // return Action.CommitMessage;
    // }
    return Action.ReconsumeLater;
    }

    return Action.CommitMessage;
    }

    /**
    * 打印消息
    *
    * @param message
    */
    private void messagePrinter(Message message) {
    String msg = String.format("messageID = %s, value = %s", message.getMsgID(), new String(message.getBody()));
    log.info("招商rocketMq消费监听器收到topic:Tag为[ {}:{} ]的rocketMQ消息: {}", message.getTopic(), message.getTag(), msg);
    }

    /**
    * 当消费遇到异常时,打印error消息
    *
    * @param message
    * @param e
    * @return
    */
    private String errorMessagePrinter(Message message, Exception e) {
    String ERROR_MSG_TEMPLATE = "rocketMQ[ topic=%s, tag=%s, msgId=%s, message=%s ]消费遇到异常:%s";
    return String.format(ERROR_MSG_TEMPLATE, message.getTopic(), message.getTag(), message.getMsgID(),
    new String(message.getBody()), e.getMessage());
    }
    }

    阿里云可以和本地进行联调 message_id




    //MsgDto<NotifyMsgRepsDto> result = JsonUtils.json2Object(new String(message.getBody()), MsgDto.class);
    MsgDto<NotifyMsgRepsDto> result = JSON.parseObject(new String(message.getBody()),
    new TypeReference<MsgDto<NotifyMsgRepsDto>>(){});




  • 相关阅读:
    【CF1528D】It's a bird! No, it's a plane! No, it's AaParsa!
    【CF1528C】Trees of Tranquillity
    【CF1528B】Kavi on Pairing Duty
    【洛谷P5443】桥梁
    【CF gym102759I】Query On A Tree 17
    ansible-playbook批量修改密码
    kubernetes集群简单实例搭建
    UiPath屏幕抓取Screen Scraping的介绍和使用
    学习廖雪峰的Git教程3--从远程库克隆以及分支管理
    学习廖雪峰的Git教程2--远程仓库
  • 原文地址:https://www.cnblogs.com/ywsheng/p/13181772.html
Copyright © 2011-2022 走看看