zoukankan      html  css  js  c++  java
  • rocketMq消息的发送和消息消费

    rocketMq消息的发送和消息消费

    ###一.消息推送 ```java public void pushMessage() { String message = "推送消息内容!"; try { DefaultMQProducer producer = new DefaultMQProducer(producerGroup); // 设置NameServer地址 producer.setNamesrvAddr("服务器地址+端口号"); producer.setInstanceName("producer"); // 只需要在发送前初始化一次 producer.start(); // 构建消息实体
            Message msg = new Message(topic,// topic
                    tag,// tag
                    message.getBytes()// body
            );
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
            producer.shutdown();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    

    }

    
    ###二.消息消费
    ```java
        @Autowired
        private MessageReceiveService messageReceiveService;
    	//====好差评的服务器地址和端口=====
        @Value("${app.message.address}")
        private String address;
    	//====好差评的topic=====
        @Value("${app.message.topic}")
        private String topic;
    	//====好差评的组名=====
        @Value("${app.message.groupName}")
        private String consumerGroup;
    
        /**
         * 开始消费rocketMQ消息
         */
        @PostConstruct
        public void init() {
            try {
                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
                consumer.setNamesrvAddr(address);
                consumer.subscribe(topic, "*");
                consumer.registerMessageListener(messageReceiveService);
                consumer.start();
                logger.info("rocketMQ consumer start");
            } catch (Exception e) {
                logger.error("reocketMQ consumer start error!", e);
                e.printStackTrace();
            }
        }
    
     @Service
    public class MessageReceiveService implements MessageListenerConcurrently {
    
        private static Logger logger = LoggerFactory.getLogger(MessageReceiveService.class);
    
        @Value("${accept_system_interface}")
        private String acceptSystemInterface;
    
        /**
         * 消费rocketMQ上的消息
         *
         * @param msgs    rocketMQ消息
         * @param context 消息消费上下文
         * @return 消息处理状态
         */
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            // 判断消息类型
            return handleHcpMessage(msgs, context);
        }
    
        /**
         * <p>好差评消息消费</p>
         *
         * @param msgs    当前消息(组)
         * @param context 消息消费上下文
         */
        @Transactional(rollbackFor = {RuntimeException.class})
        private ConsumeConcurrentlyStatus handleHcpMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
                // 消息校验与序列化
                String message = null;
                try {
    			//获得消息的内容,转utf-8防止出现乱码
                    message = new String(msg.getBody(),"utf-8");
                }catch (Exception e){
                    e.printStackTrace();
                    errorLogSave(message,"当前消息转化utf-8出现异常信息");
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
    			//对消息进行对应的操作
    			...
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    
  • 相关阅读:
    centos7查看启动的进程并杀死
    3.3 Zabbix容器安装
    windows下XAMPP集成环境中,MySQL数据库的使用
    pip淘宝镜像安装
    服务起不来,查看ps axj 看服务是否为守护进程(TPGID 为-1)
    dcloud_base连接失败(root:admin123!@#qwe@tcp(192.168.8.205:3306)/dcloud_base) Error 1129: Host '192.168.8.205' is blocked because of many connection errors; unblock with 'mysqladmin flush-hosts'
    服务部署_软加密之后要重新启动才能生效
    AWS Certified Solutions Architect
    Cloud Formation Mapping经常用于AMI ID的region映射
    CloudFormation StackSets
  • 原文地址:https://www.cnblogs.com/xiaoBlog2016/p/11880848.html
Copyright © 2011-2022 走看看