zoukankan      html  css  js  c++  java
  • rabbitMq创建和获取消息

    import net.sf.json.JSONObject;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.ApplicationListener;
    import org.springframework.context.event.ContextRefreshedEvent;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.QueueingConsumer;/**
     * 启动预加载信息类
     *@author Administrator
     */
    public class ContextLoaderSpringListener implements ApplicationListener<ContextRefreshedEvent>{
        
        private static Log logger = LogFactory.getLog(ContextLoaderSpringListener.class);
        @Autowired
        private ShipmentCheckService shipmentCheckService;
    
        //当spring容器初始化完成后就会执行该方法。
        public void onApplicationEvent(ContextRefreshedEvent event) {
            logger.debug("ConfigLoadListener init......");
            try {
                //创建一个频道
                Channel channel = QueueUtil.getConnection().createChannel();
                boolean durable = true;
                //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
                channel.queueDeclare(QueueUtil.getQueueName(), durable, false, false, null);
    
                //创建队列消费者
                QueueingConsumer consumer = new QueueingConsumer(channel);
                //指定消费队列
                //TODO:并发测试MQ,ack?
                channel.basicConsume(QueueUtil.getQueueName(), false/*打开应答机制*/, consumer);
                while (true) {
                    //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
                    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                    byte[] body = delivery.getBody();
                    try {                    
                        String str=new String(body,"UTF-8");
                        JSONObject j = JSONObject.fromObject(str);
                        String shipmentId = j.getString("shipmentId");
                        String vehicleId = j.getString("vehicleId");
                        int planLineType = j.getInt("planLineType");
                    
                        shipmentCheckService.check(shipmentId,vehicleId,planLineType);
                    } catch (RuntimeException e) {
                        logger.error("货运单数据校验出现异常:", e);
                        logger.error("Source package:"+ CommUtil.getEncodeData(body));
                    }
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            } catch (Exception e) {
                logger.error("货运单存储器出现异常:", e);
            }
        }
        
    
    }
        private void storeInQueue(byte[] dst) throws IOException, TimeoutException {
            Channel channel = QueueUtil.getConnection().createChannel();
            channel.queueDeclare(QueueUtil.getQueueName(), /*持久存储*/false, false, false, null);
            channel.basicPublish("", QueueUtil.getQueueName(), null, dst);
            channel.close();
        }

  • 相关阅读:
    SpringBoot/SpringMVC Restful接口全局异常处理
    spring/springboot/springmvc启用GZIP压缩
    centos7启动SonarQube 8.6报错
    类型初始值设定项引发异常
    OCI is not properly installed on this machine (NOE1/INIT)
    动态调用webservice 此 XML 文档中禁用 DTD。
    系统缺少插件 系统插件已过期
    几种常见的函数
    MQTT 协议基本介绍
    etcd:从应用场景到实现原理的全方位解读【修订版】
  • 原文地址:https://www.cnblogs.com/tonggc1668/p/6541839.html
Copyright © 2011-2022 走看看