zoukankan      html  css  js  c++  java
  • Rabbit队列数据的传输与获取

    public class Utils {
        public static Connection getRabbitConnection(String path) throws IOException {
            ConnectionFactory factory = new ConnectionFactory();
            Properties properties = new Properties();
            //设置RabbitMQ所在主机ip或者主机名
            properties.load(new FileInputStream(path + "\WEB-INF\classes\queue.properties"));
            String host = properties.getProperty("host");
            String username = properties.getProperty("username");
            String password = properties.getProperty("password");
            factory.setHost(host);
            factory.setUsername(username);
            factory.setPassword(password);
            //创建一个连接
            return factory.newConnection();
        }
    
    
        public static void sendMsg(String msg, String queueName, String exName, String luName,String path) throws IOException {
            /**
             * 创建连接连接到MabbitMQ
             */
            Connection connection = getRabbitConnection(path);
            //创建一个频道
            Channel channel = connection.createChannel();
    
            //声明一个队列 不持久化 不单独  不自动删除
            channel.queueDeclare(queueName, true, false, false, null);
            //创建一个EX1:交换器名称  direct:交换器类型  true:是否持久化
            channel.exchangeDeclare(exName, "direct", true);
            //创建绑定 queueName:队列名称 EX1:交换器 LU1:路由键
            channel.queueBind(queueName, exName, luName);
            //往队列中发出一条消息 EX1:交换器名称  LU1:路由键
            channel.basicPublish(exName, luName, MessageProperties.MINIMAL_PERSISTENT_BASIC, msg.getBytes());
            System.out.println("Sent '" + msg + "'");
            //关闭频道和连接
            channel.close();
            connection.close();
        }
    }
    String path = request.getSession().getServletContext().getRealPath("");
    Utils.sendMsg(json,queueName,exName,luName,path);

    获取数据

    /**
     * BaseQueueListener.java
     * Created at 2018-2-5
     * Created by swsm
     * Copyright (C) 2018 BROADTEXT SOFTWARE, All rights reserved.
     */
    package com.broadtext.collect.receive;
    
    import java.io.IOException;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Map;
    
    import com.broadtext.collect.util.DomParseQueueConfig;
    import com.broadtext.collect.util.QueueColumn;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
    import org.springframework.beans.factory.annotation.Autowired;
    
    import com.alibaba.fastjson.JSON;
    import com.broadtext.collect.process.exception.UnSupportedMsgException;
    import com.broadtext.collect.process.service.CommonService;
    import com.broadtext.collect.util.ExceptionHandle;
    import com.broadtext.collect.util.StringUtils;
    import com.broadtext.common.utils.DateUtil;
    import com.rabbitmq.client.Channel;
    
    /**
     * <p>ClassName: BaseQueueListener</p>
     * <p>Description: 实现队列消费者监听的基类</p>
     * <p>Author: swsm</p>
     * <p>Date: 2018-2-5</p>
     */
    public abstract class BaseQueueListener implements ChannelAwareMessageListener {
        
        private String dateFormat;
        
        @Autowired
        private CommonService commonService;
        
        /**
         * 日志
         */
        private Logger logger = LoggerFactory.getLogger(BaseQueueListener.class);
    
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            String msg = null;
            String id = null;
            try {
                //1. 获取信息 并插入历史信息表
                msg = new String(message.getBody(), "UTF-8");
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                id = this.insertHistory(msg, channel);
                //2. 验证并处理信息
                @SuppressWarnings("unchecked")
                Map<String, Object> map = JSON.parseObject(msg, Map.class);
                if (checkNotNull(map, getNotNullCol())) {
                    strToDateCol(map, getChangeStrToDateCol());
                } else {
                    throw new UnSupportedMsgException(Arrays.toString(getNotNullCol()) + " 参数值不存在");
                }
                Map<String, Map<String, List<QueueColumn>>> collectNodeMap = DomParseQueueConfig.getSingleton(
                        this.getClass().getClassLoader().getResource("/").getPath() + "queue-config.xml").getCollectNodeArrayList();
                //3. 实际的处理信息
                this.handleMsg(map, collectNodeMap);
            } catch (Exception e) {
                String ex = ExceptionHandle.getExceptionSampleInfo(e);
                logger.error("处理" + this.getQueueName() + "信息" + msg + " 出现异常: " + ex);
                this.commonService.updateCollectHistory("1", id);
                this.commonService.insertCollectException(ex, id);
                e.printStackTrace();
            }
        }
        
        public abstract void handleMsg(Map<String, Object> map, Map<String, Map<String, List<QueueColumn>>> collectNodeMap);
    
        public void strToDateCol(Map<String, Object> map, String[] changeStrToDateCol) {
            //默认是yyyy-MM-dd HH:mm:ss
            String df = this.getDateFormat() == null ? "yyyy-MM-dd HH:mm:ss" : this.getDateFormat();
            for (String str : changeStrToDateCol) {
                map.put(str, DateUtil.getDate(String.valueOf(map.get(str)), df));
            }
        }
    
        /**
         * <p>Description: 验证不能为null</p>
         * @param map map对象
         * @param notNullCol 不能为null的字段
         * @return 都不为null true 否则返回 false
         */
        public boolean checkNotNull(Map<String, Object> map, String[] notNullCol) {
            for (String str : notNullCol) {
                if (map.get(str) == null) {
                    logger.error(str + "不存在!" + JSON.toJSONString(map) + ",不能为null的字段:" + Arrays.toString(notNullCol));
                    return false;
                }
            }
            return true;
        }
    
        /**
         * <p>Description: 获取信息 并插入历史信息表</p>
         * @param msg mq信息
         * @param channel 通道
         * @return 历史信息表id
         * @throws IOException 异常
         */
        public String insertHistory(String msg, Channel channel) throws IOException {
            String id = StringUtils.getUuid();
            this.commonService.insertHistory(msg, id, this.getQueueName());
            return id;
        }
    
        public abstract String getQueueName();
    
        public abstract String[] getNotNullCol();
    
        public abstract String[] getChangeStrToDateCol();
    
        public String getDateFormat() {
            return dateFormat;
        }
    
    }
    package com.broadtext.collect.receive;
    
    import com.broadtext.collect.process.service.BmjService;
    import com.broadtext.collect.process.variable.QueueName;
    import com.broadtext.collect.util.QueueColumn;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    
    import java.util.List;
    import java.util.Map;
    
    public class BmjQueueListener extends BaseQueueListener {
        
        @Autowired
        @Qualifier("bmjService")
        private BmjService bmjService;
        
        @Override
        public void handleMsg(Map<String, Object> map, Map<String, Map<String, List<QueueColumn>>> collectNodeMap) {
            this.bmjService.insertOrUpdateCell(map, collectNodeMap);
        }
        
        @Override
        public String getQueueName() {
            return QueueName.BMJ_QUEUE;
        }
    
        @Override
        public String[] getNotNullCol() {
            return new String[]{"Time", "TestStartTime", "TestEndTime"};
        }
    
        @Override
        public String[] getChangeStrToDateCol() {
            return new String[]{"Time", "TestStartTime", "TestEndTime"};
        }
    
    }
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
         http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
         http://www.springframework.org/schema/rabbit
         http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
    
    
        <rabbit:connection-factory id="connectionFactory"
                                   host="127.0.0.1" port="5672" username="guest" password="guest"/>
        
    
        <!--  <rabbit:connection-factory id="connectionFactory"
                                    host="192.168.129.203" port="5672" username="root" password="root" />
     -->
    
        <rabbit:admin connection-factory="connectionFactory"/>
    
        <!-- 设备状态  队列声明  -->
        <rabbit:queue id="equipStatusQueue" durable="true" auto-delete="false" exclusive="false" name="equipStatusQueue"/>
        <bean id="equipStatusQueueListener" class="com.broadtext.collect.receive.EquipStatusQueueListener"/>
        <rabbit:listener-container connection-factory="connectionFactory" concurrency="1" acknowledge="manual">
            <rabbit:listener queues="equipStatusQueue" ref="equipStatusQueueListener"/>
        </rabbit:listener-container>
    </beans>
  • 相关阅读:
    [Linux/Apache Http]Apache Http(d)服务访问时报: 403 Forbidden You don't have permission to access /cdh/ on this server.
    [Linux]常用命令之【YUM】
    .Netcore HttpClient源码探究
    echarts使用多图的表达
    记录composer 安装 yii2项目
    IE浏览器js parseInt("08")返回值不是8
    将一个条件从else语句中拆分出来导致的bug
    我的第一次WebService接口开发教程
    Oracle以逗号分隔的字符串拆分为多行数据
    JS 小技巧
  • 原文地址:https://www.cnblogs.com/zhuwenxia/p/9719796.html
Copyright © 2011-2022 走看看