public class Utils { public static Connection getRabbitConnection(String path) throws IOException { ConnectionFactory factory = new ConnectionFactory(); Properties properties = new Properties(); //设置RabbitMQ所在主机ip或者主机名 properties.load(new FileInputStream(path + "\WEB-INF\classes\queue.properties")); String host = properties.getProperty("host"); String username = properties.getProperty("username"); String password = properties.getProperty("password"); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); //创建一个连接 return factory.newConnection(); } public static void sendMsg(String msg, String queueName, String exName, String luName,String path) throws IOException { /** * 创建连接连接到MabbitMQ */ Connection connection = getRabbitConnection(path); //创建一个频道 Channel channel = connection.createChannel(); //声明一个队列 不持久化 不单独 不自动删除 channel.queueDeclare(queueName, true, false, false, null); //创建一个EX1:交换器名称 direct:交换器类型 true:是否持久化 channel.exchangeDeclare(exName, "direct", true); //创建绑定 queueName:队列名称 EX1:交换器 LU1:路由键 channel.queueBind(queueName, exName, luName); //往队列中发出一条消息 EX1:交换器名称 LU1:路由键 channel.basicPublish(exName, luName, MessageProperties.MINIMAL_PERSISTENT_BASIC, msg.getBytes()); System.out.println("Sent '" + msg + "'"); //关闭频道和连接 channel.close(); connection.close(); } }
String path = request.getSession().getServletContext().getRealPath("");
Utils.sendMsg(json,queueName,exName,luName,path);
获取数据
/** * BaseQueueListener.java * Created at 2018-2-5 * Created by swsm * Copyright (C) 2018 BROADTEXT SOFTWARE, All rights reserved. */ package com.broadtext.collect.receive; import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Map; import com.broadtext.collect.util.DomParseQueueConfig; import com.broadtext.collect.util.QueueColumn; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.beans.factory.annotation.Autowired; import com.alibaba.fastjson.JSON; import com.broadtext.collect.process.exception.UnSupportedMsgException; import com.broadtext.collect.process.service.CommonService; import com.broadtext.collect.util.ExceptionHandle; import com.broadtext.collect.util.StringUtils; import com.broadtext.common.utils.DateUtil; import com.rabbitmq.client.Channel; /** * <p>ClassName: BaseQueueListener</p> * <p>Description: 实现队列消费者监听的基类</p> * <p>Author: swsm</p> * <p>Date: 2018-2-5</p> */ public abstract class BaseQueueListener implements ChannelAwareMessageListener { private String dateFormat; @Autowired private CommonService commonService; /** * 日志 */ private Logger logger = LoggerFactory.getLogger(BaseQueueListener.class); @Override public void onMessage(Message message, Channel channel) throws Exception { String msg = null; String id = null; try { //1. 获取信息 并插入历史信息表 msg = new String(message.getBody(), "UTF-8"); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); id = this.insertHistory(msg, channel); //2. 验证并处理信息 @SuppressWarnings("unchecked") Map<String, Object> map = JSON.parseObject(msg, Map.class); if (checkNotNull(map, getNotNullCol())) { strToDateCol(map, getChangeStrToDateCol()); } else { throw new UnSupportedMsgException(Arrays.toString(getNotNullCol()) + " 参数值不存在"); } Map<String, Map<String, List<QueueColumn>>> collectNodeMap = DomParseQueueConfig.getSingleton( this.getClass().getClassLoader().getResource("/").getPath() + "queue-config.xml").getCollectNodeArrayList(); //3. 实际的处理信息 this.handleMsg(map, collectNodeMap); } catch (Exception e) { String ex = ExceptionHandle.getExceptionSampleInfo(e); logger.error("处理" + this.getQueueName() + "信息" + msg + " 出现异常: " + ex); this.commonService.updateCollectHistory("1", id); this.commonService.insertCollectException(ex, id); e.printStackTrace(); } } public abstract void handleMsg(Map<String, Object> map, Map<String, Map<String, List<QueueColumn>>> collectNodeMap); public void strToDateCol(Map<String, Object> map, String[] changeStrToDateCol) { //默认是yyyy-MM-dd HH:mm:ss String df = this.getDateFormat() == null ? "yyyy-MM-dd HH:mm:ss" : this.getDateFormat(); for (String str : changeStrToDateCol) { map.put(str, DateUtil.getDate(String.valueOf(map.get(str)), df)); } } /** * <p>Description: 验证不能为null</p> * @param map map对象 * @param notNullCol 不能为null的字段 * @return 都不为null true 否则返回 false */ public boolean checkNotNull(Map<String, Object> map, String[] notNullCol) { for (String str : notNullCol) { if (map.get(str) == null) { logger.error(str + "不存在!" + JSON.toJSONString(map) + ",不能为null的字段:" + Arrays.toString(notNullCol)); return false; } } return true; } /** * <p>Description: 获取信息 并插入历史信息表</p> * @param msg mq信息 * @param channel 通道 * @return 历史信息表id * @throws IOException 异常 */ public String insertHistory(String msg, Channel channel) throws IOException { String id = StringUtils.getUuid(); this.commonService.insertHistory(msg, id, this.getQueueName()); return id; } public abstract String getQueueName(); public abstract String[] getNotNullCol(); public abstract String[] getChangeStrToDateCol(); public String getDateFormat() { return dateFormat; } }
package com.broadtext.collect.receive; import com.broadtext.collect.process.service.BmjService; import com.broadtext.collect.process.variable.QueueName; import com.broadtext.collect.util.QueueColumn; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import java.util.List; import java.util.Map; public class BmjQueueListener extends BaseQueueListener { @Autowired @Qualifier("bmjService") private BmjService bmjService; @Override public void handleMsg(Map<String, Object> map, Map<String, Map<String, List<QueueColumn>>> collectNodeMap) { this.bmjService.insertOrUpdateCell(map, collectNodeMap); } @Override public String getQueueName() { return QueueName.BMJ_QUEUE; } @Override public String[] getNotNullCol() { return new String[]{"Time", "TestStartTime", "TestEndTime"}; } @Override public String[] getChangeStrToDateCol() { return new String[]{"Time", "TestStartTime", "TestEndTime"}; } }
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="guest" password="guest"/> <!-- <rabbit:connection-factory id="connectionFactory" host="192.168.129.203" port="5672" username="root" password="root" /> --> <rabbit:admin connection-factory="connectionFactory"/> <!-- 设备状态 队列声明 --> <rabbit:queue id="equipStatusQueue" durable="true" auto-delete="false" exclusive="false" name="equipStatusQueue"/> <bean id="equipStatusQueueListener" class="com.broadtext.collect.receive.EquipStatusQueueListener"/> <rabbit:listener-container connection-factory="connectionFactory" concurrency="1" acknowledge="manual"> <rabbit:listener queues="equipStatusQueue" ref="equipStatusQueueListener"/> </rabbit:listener-container> </beans>