zoukankan      html  css  js  c++  java
  • 阿里云rocketmq用法

    package com.sunxing.service.merchants.config;

    import com.aliyun.openservices.ons.api.Consumer;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import com.sunxing.service.merchants.listener.RocketConsumerListener;
    import lombok.Getter;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import javax.annotation.PostConstruct;
    import java.util.Properties;

    /**
    * rocketMQ消费者配置
    * @author vincen
    */
    @Configuration
    @Getter
    @Slf4j
    public class ConsumerConfig {

    @Value("${workorder.rocketmq.name-server}")
    private String nameServer;

    @Value("${workorder.rocketmq.consumer.access-key}")
    private String accessKey;

    @Value("${workorder.rocketmq.consumer.secret-key}")
    private String secretKey;

    @Value("${workorder.rocketmq.consumer.topic}")
    private String topic;

    @Value("${workorder.rocketmq.consumer.tag}")
    private String tag;

    @Value("${workorder.rocketmq.consumer.group-id}")
    private String groupId;

    @Value("${workorder.rocketmq.consumer.timeout-mills}")
    private String timeoutMills;


    /**
    *
    * @return
    */
    @Bean
    public RocketConsumerListener rocketConsumerListener() {
    return new RocketConsumerListener();
    }

    @PostConstruct
    public void init(){
    Properties props = new Properties();

    props.put(PropertyKeyConst.NAMESRV_ADDR, nameServer);
    props.put(PropertyKeyConst.GROUP_ID, groupId);
    props.put(PropertyKeyConst.AccessKey, accessKey);
    props.put(PropertyKeyConst.SecretKey, secretKey);
    props.put(PropertyKeyConst.MaxReconsumeTimes, 3);
    props.put(PropertyKeyConst.SendMsgTimeoutMillis, timeoutMills);

    Consumer consumer = ONSFactory.createConsumer(props);
    consumer.subscribe(topic, tag, rocketConsumerListener());

    consumer.start();

    log.info("--- RocketMq Consumer Runner Started OK.");
    }
    }


    package com.sunxing.service.merchants.listener;
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.TypeReference;
    import com.aliyun.openservices.ons.api.Action;
    import com.aliyun.openservices.ons.api.ConsumeContext;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.MessageListener;
    import com.sunxing.service.merchants.constant.SceneType;
    import com.sunxing.service.merchants.service.MerchantsPlatMerchantService;
    import lombok.extern.slf4j.Slf4j;
    import net.sunxing.workorder.data.dto.response.mq.MsgDto;
    import net.sunxing.workorder.data.dto.response.mq.NotifyMsgRepsDto;
    import net.sunxing.workorder.data.dto.response.operator.OperatorDto;
    import net.sunxing.workorder.data.dto.response.task.TaskDto;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;

    import static com.sunxing.service.merchants.constant.CommonConstants.OPEN_WORK_ORDER;

    /**
    * rocketMq消费监听器
    *
    * @author vincen
    */
    @Slf4j
    public class RocketConsumerListener implements MessageListener {

    /**
    *
    */
    private static final Logger logger = LoggerFactory.getLogger(RocketConsumerListener.class);




    /**
    * 消费消息
    * @param message
    * @param consumeContext
    * @return
    */
    @Override
    public Action consume(Message message, ConsumeContext consumeContext) {
    try {

    messagePrinter(message);
    //MsgDto<NotifyMsgRepsDto> result = JsonUtils.json2Object(new String(message.getBody()), MsgDto.class);
    MsgDto<NotifyMsgRepsDto> result = JSON.parseObject(new String(message.getBody()),
    new TypeReference<MsgDto<NotifyMsgRepsDto>>(){});

    if (result!=null){


    }else
    {

    }
    } catch (Exception e) {

    // if (message.getReconsumeTimes() >= 3) {
    // return Action.CommitMessage;
    // }
    return Action.ReconsumeLater;
    }

    return Action.CommitMessage;
    }

    /**
    * 打印消息
    *
    * @param message
    */
    private void messagePrinter(Message message) {
    String msg = String.format("messageID = %s, value = %s", message.getMsgID(), new String(message.getBody()));
    log.info("招商rocketMq消费监听器收到topic:Tag为[ {}:{} ]的rocketMQ消息: {}", message.getTopic(), message.getTag(), msg);
    }

    /**
    * 当消费遇到异常时,打印error消息
    *
    * @param message
    * @param e
    * @return
    */
    private String errorMessagePrinter(Message message, Exception e) {
    String ERROR_MSG_TEMPLATE = "rocketMQ[ topic=%s, tag=%s, msgId=%s, message=%s ]消费遇到异常:%s";
    return String.format(ERROR_MSG_TEMPLATE, message.getTopic(), message.getTag(), message.getMsgID(),
    new String(message.getBody()), e.getMessage());
    }
    }

    阿里云可以和本地进行联调 message_id




    //MsgDto<NotifyMsgRepsDto> result = JsonUtils.json2Object(new String(message.getBody()), MsgDto.class);
    MsgDto<NotifyMsgRepsDto> result = JSON.parseObject(new String(message.getBody()),
    new TypeReference<MsgDto<NotifyMsgRepsDto>>(){});




  • 相关阅读:
    spark系列-6、对Application,Driver,Job,Task,Stage的理解
    spark系列-5、RDD、DataFrame、Dataset的区别和各自的优势
    spark系列-4、spark序列化方案、GC对spark性能的影响
    spark系列-2、Spark 核心数据结构:弹性分布式数据集 RDD
    nginx学习(九):跨域配置和防盗链配置
    nginx学习(八):nginx配置gzip
    nginx学习(七):nginx提供静态资源服务
    nginx学习(六):日志切割
    nginx学习(五):nginx.conf 核心配置文件详解
    nginx学习(四):nginx处理web请求机制
  • 原文地址:https://www.cnblogs.com/ywsheng/p/13181772.html
Copyright © 2011-2022 走看看