引言
最近参与了一个智慧城市综合管理平台的项目,主要核心业务就是针对视频监控管理统计分析城市车辆的相关信息,涉及到几个平台以及和其他公司合作开发业务的场景,需要对车辆数据进行同步和共享,中间就用到了RabbitMQ的消息中间件,对车辆的大数据进行发送监控和不同系统间的信息同步,下边就简单梳理讲解一下RabbitMQ发送消息的过程和业务。具体的RabbitMQ服务端和客户端的配置再此就不讲解了
1:车辆大数据的 BMS同步车辆大数据消息实体CLS_VO_Ignite_Message
package com.tiandy.easy7.core.vo; import java.util.List; /** * 同步车辆大数据消息结构 */ public class CLS_VO_Ignite_Message { private String source; //标识消息来源于BMS系统 private String type; //user-用户 private String operate; //add-新增,del-删除,update-更新 private CLS_VO_Ignite_User user; //用户信息 private List<CLS_VO_Ignite_Role> role_list; //用户的权限信息 private List<CLS_VO_Ignite_Tollgate> tollgate_list; //卡口对应角色清单 private List<CLS_VO_Ignite_Camera> camera_list; //相机对应角色清单 private List<CLS_VO_AreaParam> area_list; private List<CLS_VO_GisCoordinates> gis_list; //经纬度集合 private CLS_VO_DomainInfoEx domainInfo; public CLS_VO_Ignite_Message(){} public CLS_VO_Ignite_Message(String source, String type, String operate, CLS_VO_Ignite_User user, List<CLS_VO_Ignite_Role> role_list, List<CLS_VO_Ignite_Tollgate> tollgate_list, List<CLS_VO_Ignite_Camera> camera_list,List<CLS_VO_AreaParam> area_list) { this.source = source; this.type = type; this.operate = operate; this.user = user; this.role_list = role_list; this.tollgate_list = tollgate_list; this.camera_list = camera_list; this.area_list = area_list; } public CLS_VO_Ignite_Message(String source,String type, String operate, List<CLS_VO_GisCoordinates> gis_list) { this.source = source; this.type = type; this.operate = operate; this.gis_list = gis_list; } public CLS_VO_Ignite_Message(String source,String type, String operate, CLS_VO_DomainInfoEx domainInfo) { this.source = source; this.type = type; this.operate = operate; this.domainInfo = domainInfo; } public CLS_VO_DomainInfoEx getDomainInfo() { return domainInfo; } public void setDomainInfo(CLS_VO_DomainInfoEx domainInfo) { this.domainInfo = domainInfo; } public String getSource() { return source; } public void setSource(String source) { this.source = source; } public String getType() { return type; } public void setType(String type) { this.type = type; } public String getOperate() { return operate; } public void setOperate(String operate) { this.operate = operate; } public CLS_VO_Ignite_User getUser() { return user; } public void setUser(CLS_VO_Ignite_User user) { this.user = user; } public List<CLS_VO_Ignite_Role> getRole_list() { return role_list; } public void setRole_list(List<CLS_VO_Ignite_Role> role_list) { this.role_list = role_list; } public List<CLS_VO_Ignite_Tollgate> getTollgate_list() { return tollgate_list; } public void setTollgate_list(List<CLS_VO_Ignite_Tollgate> tollgate_list) { this.tollgate_list = tollgate_list; } public List<CLS_VO_Ignite_Camera> getCamera_list() { return camera_list; } public void setCamera_list(List<CLS_VO_Ignite_Camera> camera_list) { this.camera_list = camera_list; } public List<CLS_VO_AreaParam> getArea_list() { return area_list; } public void setArea_list(List<CLS_VO_AreaParam> area_list) { this.area_list = area_list; } public List<CLS_VO_GisCoordinates> getGis_list() { return gis_list; } public void setGis_list(List<CLS_VO_GisCoordinates> gis_list) { this.gis_list = gis_list; } }
2:发送RabbitMQ消息的帮助类RabbitMQClientSend
package com.tiandy.easy7.its.rabbitmq; import java.io.IOException; import java.util.HashMap; import java.util.Map; import cn.jpush.api.utils.StringUtils; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; import com.tiandy.easy7.core.bo.CLS_BO_User; import easy7.datatype.CLS_Easy7_Error; import easy7.datatype.CLS_Easy7_Types; import org.apache.log4j.Logger; public class CLS_RabbitMQClientSend { private static final Logger log = Logger.getLogger(CLS_RabbitMQClientSend.class); //rabbitmq连接 private static Connection connection = null; //rabbitmq通道 private static Channel channel = null ; //连接状态标识 public static boolean connectStatus = false; public Connection getConnection() { return connection; } public Channel getChannel() { return channel; } //初始化rabbitmq连接工厂和通道 public static void initialize(){ try { //连接工厂 ConnectionFactory factory= CLS_RabbitMQUtil.getRabbitMQConnectionFactory(); //关闭连接与通道 closeConnection(); connection = factory.newConnection(); channel = connection.createChannel(); try { //声明一个持久化的交换器,名称为BMS_SYNC_EXCHANGE_NAME 类型为FANOUT(广播模式) channel.exchangeDeclare("bms_sync_user_auth", BuiltinExchangeType.FANOUT ,true); connectStatus = true ; } catch (Exception e) { connectStatus = false; e.printStackTrace(); } } catch (Exception e) { connectStatus = false ; log.error("CLS_RabbitMQClientSend method initialize error!"); } } /** * 向消息中间件发送消息(从BMS同步到车辆大数据) * @param info 消息 * @return */ public static int sendMsg(String info) { //校验 if(StringUtils.isEmpty(info) ){ return CLS_Easy7_Error.ERROR_PARAM; } try { log.debug("CLS_RabbitMQClientSend sendMsg:" + info.getBytes()); channel.basicPublish("bms_sync_user_auth", "", null, info.getBytes()); return CLS_Easy7_Error.ERROR_OK; } catch (Exception e) { log.error("CLS_RabbitMQClientSend method sendMsg error!"); return CLS_Easy7_Error.ERROR_REQUEST_FAILED; } } //关闭连接 public static void closeConnection(){ try { if (channel != null) { if(channel.isOpen()) { channel.close(); channel = null; } } } catch (Exception e) { e.printStackTrace(); } try { if (connection != null) { if(connection.isOpen()) { connection.close(); connection = null; } } } catch (Exception e) { e.printStackTrace(); } } }
3:RabbitMQ的核心帮助类RabbitMQUtil
package com.tiandy.easy7.its.rabbitmq; import javax.annotation.Resource; import com.tiandy.easy7.core.util.Tools; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; import net.sf.json.JSONObject; import com.rabbitmq.client.ConnectionFactory; import com.tiandy.easy7.core.bo.CLS_BO_SystemInfo; import com.tiandy.easy7.core.po.TabSystemInfo; import com.tiandy.easy7.core.util.MyApplicationContextUtil; import com.tiandy.easy7.core.vo.CLS_VO_Result; import com.tiandy.easy7.face.rabbitmq.CLS_RabbitMQExchangeRecive; import easy7.datatype.CLS_Easy7_Types; import java.net.InetAddress; //MQ工具类 public class CLS_RabbitMQUtil { private static final Logger log = LoggerFactory.getLogger(CLS_RabbitMQUtil.class); @Resource(name="boSystemInfo") private static CLS_BO_SystemInfo boSystemInfo; //构造方法私有 private CLS_RabbitMQUtil(){ } //得到rabbitmq连接工厂 public static final ConnectionFactory getRabbitMQConnectionFactory(){ //平台重启从数据库获取rabbitMq的IP getRabbitMQIP(); //当平台第一次使用时,默认为127.0.0.1 if(null == CLS_Easy7_Types.rabbitmq_host_for_bj || "".equals(CLS_Easy7_Types.rabbitmq_host_for_bj)){ CLS_Easy7_Types.rabbitmq_host_for_bj = "127.0.0.1"; } RabbitMQConnectionFactory.factory.setHost(CLS_Easy7_Types.rabbitmq_host_for_bj); //127.0.0.1 return RabbitMQConnectionFactory.factory; } /** * 获取RabbitMQ IP地址 */ public static void getRabbitMQIP() { ApplicationContext ctx = MyApplicationContextUtil.getContext(); CLS_BO_SystemInfo boSystemInfo = ctx.getBean(CLS_BO_SystemInfo.class); CLS_VO_Result result = null; JSONObject json = null; JSONObject jsonContent = null; JSONObject jsonParam = null; int ret; String rabbitmq_ip = null; try { //LS_Easy7_Types.EASY7_ACTIVEMQ_SID ="activemq-server-config-2017728";//activemqid result = boSystemInfo.getSystemInfo(CLS_Easy7_Types.EASY7_ACTIVEMQ_SID); json = JSONObject.fromObject(result); ret = json.getInt("ret"); if(ret == 0) { String content = json.getString("content"); if(content != null && !"".equals(content)) { jsonContent = JSONObject.fromObject(content); String sParam = jsonContent.getString("sParam"); if(sParam != null && !"".equals(sParam)) { jsonParam = JSONObject.fromObject(sParam); rabbitmq_ip = jsonParam.getString("rabbitmqIp"); } } } } catch (Exception e1) { e1.printStackTrace(); log.error("rabbitmq id is not exist!"); return; } if(jsonParam != null) { jsonParam.clear(); jsonParam = null; } if(jsonContent != null) { jsonContent.clear(); jsonContent = null; } if(json != null) { json.clear(); json = null; } if(result == null || rabbitmq_ip == null || "".equals(rabbitmq_ip)) { log.error("rabbitmq ip is not exist!"); return; } "127.0.0.1" = rabbitmq_ip; //获取本地服务器IP,作为接受下级权限申请的routinfKey try{ if(null == CLS_Easy7_Types.LOCALHOST_IP || "".equals(CLS_Easy7_Types.LOCALHOST_IP)){ CLS_Easy7_Types.LOCALHOST_IP = Tools.getLinuxLocalIp(); } }catch (Exception e){ log.error("get LinuxIp error" + e); } } //rabbitmq连接工厂单例 private static class RabbitMQConnectionFactory{ private static ConnectionFactory factory = new ConnectionFactory(); static{ try { //可以通过properties配置文件配置 factory.setPort(5673);//MQ端口 factory.setUsername(admin);//MQ用户名 factory.setPassword(123456);//MQ密码 factory.setRequestedHeartbeat(10);//设置心跳 (秒) factory.setAutomaticRecoveryEnabled(true);//自动恢复连接 } catch (Exception e) { e.printStackTrace(); } } } }
4:获取需要发送MQ的消息
public CLS_VO_Ignite_Message deleteHostCamera(){
//定义一个空的list模拟数据,具体实际业务中里边肯定是有数据的(需要发现车辆的数据消息) List<CLS_VO_Ignite_Camera> cameralist = new ArrayList<CLS_VO_Ignite_Camera>(); //定义拼接向Rabbit发送的消息体 (json格式的数据) CLS_VO_Ignite_Message message = new CLS_VO_Ignite_Message(CLS_Easy7_Types.MESSAGE_SOURCE_BMS,CLS_Easy7_Types.MESSAGE_TYPE_CAMERA, CLS_Easy7_Types.MESSAGE_OPERATE_DEL,null,null,null,cameralist,null); return message; }
5:发送消息
public void sendRabbitMQMessage(){ //如发送的消息:
//{"id":"f6560157-24ae-475b-9736-34072684b008","reviewInfo":"","status":1,"ids":[],"currentUserId":"admin","reviewUser":"admin"}
CLS_VO_Ignite_Message message =deleteHostCamera(); //消息为空时不发送消息 if(null != message.getSource() && !"".equals(message.getSource())){ int sendResult = CLS_RabbitMQClientSend.sendMsg(JSONObject.fromObject(message).toString()); //同步消息失败,错误代码使用-40,数据库操作回滚 if(sendResult != 0){ TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); //事务回滚 result.setRet( -40);//请求失败 return result; } } }
6:接收MQ中发送的消息
1 package com.tiandy.vbs.ics.rabbitMQ; 2 import java.io.IOException; 3 import java.util.HashMap; 4 import java.util.Map; 5 import java.util.concurrent.ArrayBlockingQueue; 6 import java.util.concurrent.Future; 7 import java.util.concurrent.ThreadPoolExecutor; 8 import java.util.concurrent.TimeUnit; 9 10 import javax.annotation.PostConstruct; 11 12 import org.springframework.beans.factory.annotation.Autowired; 13 import org.springframework.stereotype.Component; 14 15 import com.rabbitmq.client.AMQP; 16 import com.rabbitmq.client.BuiltinExchangeType; 17 import com.rabbitmq.client.Channel; 18 import com.rabbitmq.client.Connection; 19 import com.rabbitmq.client.ConnectionFactory; 20 import com.rabbitmq.client.Consumer; 21 import com.rabbitmq.client.DefaultConsumer; 22 import com.rabbitmq.client.Envelope; 23 import com.tiandy.vbs.adapter.rabbitmq.CLS_ConsumerThreadPool; 24 import com.tiandy.vbs.adapter.rabbitmq.CLS_RabbitMQQueueRecive; 25 import com.tiandy.vbs.adapter.rabbitmq.CLS_RabbitMQUtil; 26 import com.tiandy.vbs.common.util.CLS_VBS_Types; 27 import com.tiandy.vbs.common.util.InterfaceLogger; 28 import com.tiandy.vbs.ics.bo.CLS_BO_DispositionAdapter; 29 import com.tiandy.vbs.ics.vo.CLS_VO_MQMessage; 30 31 @Component 32 public class CLS_QueueDispositionRecive extends CLS_RabbitMQQueueRecive{ 33 //rabbitmq连接 34 private static Connection connection = null; 35 //rabbitmq通道 36 private static Channel channel = null ; 37 //连接状态标识 38 public boolean connectStatus = false; 39 //消费者线程池 40 public static CLS_ConsumerThreadPool consumerThreadPool = new CLS_ConsumerThreadPool(CLS_VBS_Types.corePoolSize,CLS_VBS_Types.maximumPoolSize, 41 CLS_VBS_Types.keepAliveTime, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(CLS_VBS_Types.workQueue), new ThreadPoolExecutor.CallerRunsPolicy()); 42 @Autowired 43 private CLS_BO_DispositionAdapter boAdapter; 44 public static CLS_QueueDispositionRecive queueRecive; 45 @PostConstruct 46 public void init() { 47 queueRecive = this; 48 } 49 public Connection getConnection() { 50 return connection; 51 } 52 public Channel getChannel() { 53 return channel; 54 } 55 public void initialize(){ 56 try { 57 //连接工厂 58 ConnectionFactory factory= CLS_RabbitMQUtil.getRabbitMQConnectionFactory(); 59 //关闭连接与通道 60 closeConnection(); 61 connection = factory.newConnection(); 62 channel = connection.createChannel(); 63 syncConsumer(); 64 connectStatus = true ; 65 } catch (Exception e) { 66 connectStatus = false ; 67 e.printStackTrace(); 68 InterfaceLogger.error("CLS_RabbitMQQueueRecive method initialize:"+e.getMessage(),e); 69 } 70 }
//核心同步接收消息 71 public void syncConsumer()throws Exception{ 72 Map<String, Object> args = new HashMap<String, Object>(); 73 args.put("x-max-length", 100000); 74 args.put("x-message-ttl",CLS_VBS_Types.x_message_ttl); 75 76 channel.queueDeclare(CLS_VBS_Types.rabbitmq_queue_alarm_record,true,false,false,args); 77 //声明交换器 78 channel.exchangeDeclare(CLS_VBS_Types.rabbitmq_exchange_name, BuiltinExchangeType.FANOUT ,true); 79 channel.queueBind(CLS_VBS_Types.rabbitmq_queue_alarm_record, CLS_VBS_Types.rabbitmq_exchange_name, ""); 80 //消费者 81 Consumer consumer = new DefaultConsumer(channel) { 82 @Override 83 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 84 //接收到的消息 85 String message = new String(body, "UTF-8"); 86 InterfaceLogger.debug("recive message:"+message); 87 if(!"".equals(message)){ 88 CLS_VO_MQMessage mqVo = new CLS_VO_MQMessage(); 89 mqVo.setMessage(message); 90 mqVo.setEnvelope(envelope); 91 CLS_DispositionConsumer worker = new CLS_DispositionConsumer(mqVo,queueRecive.boAdapter,channel); 92 consumerThreadPool.submit(worker);//多线程处理数据 93 }else { 94 channel.basicAck(envelope.getDeliveryTag(), false); 95 } 96 } 97 }; 98 //消息手动确认 99 channel.basicConsume(CLS_VBS_Types.rabbitmq_queue_alarm_record, false, consumer); 100 } 101 //关闭连接 102 public void closeConnection(){ 103 try { 104 if (channel != null) { 105 if(channel.isOpen()) { 106 channel.close(); 107 channel = null; 108 } 109 } 110 } catch (Exception e) { 111 InterfaceLogger.error("CLS_RabbitMQExchangeRecive closeChannel error " + e); 112 e.printStackTrace(); 113 } 114 try { 115 if (connection != null) { 116 if(connection.isOpen()) { 117 connection.close(); 118 connection = null; 119 } 120 } 121 } catch (Exception e) { 122 InterfaceLogger.error("CLS_RabbitMQExchangeRecive closeConnection error " + e); 123 e.printStackTrace(); 124 } 125 } 126 }
7:线程池处理消息类
1 package com.tiandy.vbs.adapter.rabbitmq; 2 3 import java.util.concurrent.ArrayBlockingQueue; 4 import java.util.concurrent.ThreadPoolExecutor; 5 import java.util.concurrent.TimeUnit; 6 import org.springframework.stereotype.Component; 7 import com.tiandy.vbs.common.util.CLS_VBS_Types; 8 @Component 9 public abstract class CLS_RabbitMQQueueRecive { 10 11 //消费者线程池 12 public static CLS_ConsumerThreadPool consumerThreadPool = new CLS_ConsumerThreadPool(CLS_VBS_Types.corePoolSize,CLS_VBS_Types.maximumPoolSize, 13 CLS_VBS_Types.keepAliveTime, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(CLS_VBS_Types.workQueue), new ThreadPoolExecutor.CallerRunsPolicy()); 14 //初始化连接 15 public abstract void initialize(); 16 //消费者 17 public abstract void syncConsumer()throws Exception; 18 //关闭连接 19 public abstract void closeConnection(); 20 }
其他应用或平台接收到MQ中的消息后,解析JSON数据,转换为对于的数据实体,将数据添加到相应的库中表中即可,这样就完成了消息的发送和数据的同步!
至此,发现消息得过程就完成了!