zoukankan      html  css  js  c++  java
  • rocketMq消息的发送和消息消费

    rocketMq消息的发送和消息消费

    ###一.消息推送 ```java public void pushMessage() { String message = "推送消息内容!"; try { DefaultMQProducer producer = new DefaultMQProducer(producerGroup); // 设置NameServer地址 producer.setNamesrvAddr("服务器地址+端口号"); producer.setInstanceName("producer"); // 只需要在发送前初始化一次 producer.start(); // 构建消息实体
            Message msg = new Message(topic,// topic
                    tag,// tag
                    message.getBytes()// body
            );
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
            producer.shutdown();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    

    }

    
    ###二.消息消费
    ```java
        @Autowired
        private MessageReceiveService messageReceiveService;
    	//====好差评的服务器地址和端口=====
        @Value("${app.message.address}")
        private String address;
    	//====好差评的topic=====
        @Value("${app.message.topic}")
        private String topic;
    	//====好差评的组名=====
        @Value("${app.message.groupName}")
        private String consumerGroup;
    
        /**
         * 开始消费rocketMQ消息
         */
        @PostConstruct
        public void init() {
            try {
                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
                consumer.setNamesrvAddr(address);
                consumer.subscribe(topic, "*");
                consumer.registerMessageListener(messageReceiveService);
                consumer.start();
                logger.info("rocketMQ consumer start");
            } catch (Exception e) {
                logger.error("reocketMQ consumer start error!", e);
                e.printStackTrace();
            }
        }
    
     @Service
    public class MessageReceiveService implements MessageListenerConcurrently {
    
        private static Logger logger = LoggerFactory.getLogger(MessageReceiveService.class);
    
        @Value("${accept_system_interface}")
        private String acceptSystemInterface;
    
        /**
         * 消费rocketMQ上的消息
         *
         * @param msgs    rocketMQ消息
         * @param context 消息消费上下文
         * @return 消息处理状态
         */
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            // 判断消息类型
            return handleHcpMessage(msgs, context);
        }
    
        /**
         * <p>好差评消息消费</p>
         *
         * @param msgs    当前消息(组)
         * @param context 消息消费上下文
         */
        @Transactional(rollbackFor = {RuntimeException.class})
        private ConsumeConcurrentlyStatus handleHcpMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
                // 消息校验与序列化
                String message = null;
                try {
    			//获得消息的内容,转utf-8防止出现乱码
                    message = new String(msg.getBody(),"utf-8");
                }catch (Exception e){
                    e.printStackTrace();
                    errorLogSave(message,"当前消息转化utf-8出现异常信息");
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
    			//对消息进行对应的操作
    			...
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    
  • 相关阅读:
    ASP.NET MVC案例——————拦截器
    Windows Azure Virtual Network (10) 使用Azure Access Control List(ACL)设置客户端访问权限
    Windows Azure Storage (20) 使用Azure File实现共享文件夹
    Windows Azure HandBook (5) Azure混合云解决方案
    Windows Azure Service Bus (6) 中继(Relay On) 使用VS2013开发Service Bus Relay On
    Azure PowerShell (9) 使用PowerShell导出订阅下所有的Azure VM的Public IP和Private IP
    Windows Azure Service Bus (5) 主题(Topic) 使用VS2013开发Service Bus Topic
    Azure China (9) 在Azure China配置CDN服务
    Windows Azure Storage (19) 再谈Azure Block Blob和Page Blob
    Windows Azure HandBook (4) 分析Windows Azure如何处理Session
  • 原文地址:https://www.cnblogs.com/xiaoBlog2016/p/11880848.html
Copyright © 2011-2022 走看看