zoukankan      html  css  js  c++  java
  • RabbitMQ实现消息的发送和数据同步

    引言

        最近参与了一个智慧城市综合管理平台的项目,主要核心业务就是针对视频监控管理统计分析城市车辆的相关信息,涉及到几个平台以及和其他公司合作开发业务的场景,需要对车辆数据进行同步和共享,中间就用到了RabbitMQ的消息中间件,对车辆的大数据进行发送监控和不同系统间的信息同步,下边就简单梳理讲解一下RabbitMQ发送消息的过程和业务。具体的RabbitMQ服务端和客户端的配置再此就不讲解了

      1:车辆大数据的 BMS同步车辆大数据消息实体CLS_VO_Ignite_Message 

    package com.tiandy.easy7.core.vo;
    
    import java.util.List;
    
    /**
     * 同步车辆大数据消息结构
     */
    public class CLS_VO_Ignite_Message {
    
        private String source;                          //标识消息来源于BMS系统
        private String type;                            //user-用户
        private String operate;                         //add-新增,del-删除,update-更新
        private CLS_VO_Ignite_User user;                //用户信息
        private List<CLS_VO_Ignite_Role> role_list;     //用户的权限信息
        private List<CLS_VO_Ignite_Tollgate> tollgate_list;                 //卡口对应角色清单
        private List<CLS_VO_Ignite_Camera> camera_list;                   //相机对应角色清单
        private List<CLS_VO_AreaParam> area_list;
        private List<CLS_VO_GisCoordinates> gis_list;   //经纬度集合
        private CLS_VO_DomainInfoEx domainInfo;
        public CLS_VO_Ignite_Message(){}
    
        public CLS_VO_Ignite_Message(String source, String type, String operate, CLS_VO_Ignite_User user, List<CLS_VO_Ignite_Role> role_list, List<CLS_VO_Ignite_Tollgate> tollgate_list, List<CLS_VO_Ignite_Camera> camera_list,List<CLS_VO_AreaParam> area_list) {
            this.source = source;
            this.type = type;
            this.operate = operate;
            this.user = user;
            this.role_list = role_list;
            this.tollgate_list = tollgate_list;
            this.camera_list = camera_list;
            this.area_list = area_list;
        }
    
        public CLS_VO_Ignite_Message(String source,String type, String operate, List<CLS_VO_GisCoordinates> gis_list) {
            this.source = source;
            this.type = type;
            this.operate = operate;
            this.gis_list = gis_list;
        }
        
        public CLS_VO_Ignite_Message(String source,String type, String operate, CLS_VO_DomainInfoEx domainInfo) {
            this.source = source;
            this.type = type;
            this.operate = operate;
            this.domainInfo = domainInfo;
        }
    
        public CLS_VO_DomainInfoEx getDomainInfo() {
            return domainInfo;
        }
    
        public void setDomainInfo(CLS_VO_DomainInfoEx domainInfo) {
            this.domainInfo = domainInfo;
        }
    
        public String getSource() {
            return source;
        }
    
        public void setSource(String source) {
            this.source = source;
        }
    
        public String getType() {
            return type;
        }
    
        public void setType(String type) {
            this.type = type;
        }
    
        public String getOperate() {
            return operate;
        }
    
        public void setOperate(String operate) {
            this.operate = operate;
        }
    
        public CLS_VO_Ignite_User getUser() {
            return user;
        }
    
        public void setUser(CLS_VO_Ignite_User user) {
            this.user = user;
        }
    
        public List<CLS_VO_Ignite_Role> getRole_list() {
            return role_list;
        }
    
        public void setRole_list(List<CLS_VO_Ignite_Role> role_list) {
            this.role_list = role_list;
        }
    
        public List<CLS_VO_Ignite_Tollgate> getTollgate_list() {
            return tollgate_list;
        }
    
        public void setTollgate_list(List<CLS_VO_Ignite_Tollgate> tollgate_list) {
            this.tollgate_list = tollgate_list;
        }
    
        public List<CLS_VO_Ignite_Camera> getCamera_list() {
            return camera_list;
        }
    
        public void setCamera_list(List<CLS_VO_Ignite_Camera> camera_list) {
            this.camera_list = camera_list;
        }
    
        public List<CLS_VO_AreaParam> getArea_list() {
            return area_list;
        }
    
        public void setArea_list(List<CLS_VO_AreaParam> area_list) {
            this.area_list = area_list;
        }
    
        public List<CLS_VO_GisCoordinates> getGis_list() {
            return gis_list;
        }
    
        public void setGis_list(List<CLS_VO_GisCoordinates> gis_list) {
            this.gis_list = gis_list;
        }
    }

    2:发送RabbitMQ消息的帮助类RabbitMQClientSend

    package com.tiandy.easy7.its.rabbitmq;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import cn.jpush.api.utils.StringUtils;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.tiandy.easy7.core.bo.CLS_BO_User;
    import easy7.datatype.CLS_Easy7_Error;
    import easy7.datatype.CLS_Easy7_Types;
    import org.apache.log4j.Logger;
    
    public class CLS_RabbitMQClientSend {
    
        private static final Logger log = Logger.getLogger(CLS_RabbitMQClientSend.class);
    
        //rabbitmq连接
        private static Connection connection = null;
        //rabbitmq通道
        private static Channel channel = null ;
        //连接状态标识
        public static boolean connectStatus = false;
    
        public Connection getConnection() {
            return connection;
        }
        public Channel getChannel() {
            return channel;
        }
        //初始化rabbitmq连接工厂和通道
        public static void initialize(){
            try {
                //连接工厂
                ConnectionFactory factory= CLS_RabbitMQUtil.getRabbitMQConnectionFactory();
                //关闭连接与通道
                closeConnection();
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                try {
                    //声明一个持久化的交换器,名称为BMS_SYNC_EXCHANGE_NAME 类型为FANOUT(广播模式)
                    channel.exchangeDeclare("bms_sync_user_auth", BuiltinExchangeType.FANOUT ,true);
                    connectStatus = true ;
                } catch (Exception e) {
                    connectStatus = false;
                    e.printStackTrace();
                }
            } catch (Exception e) {
                connectStatus = false ;
                log.error("CLS_RabbitMQClientSend method initialize error!");
            }
        }
    
        /**
         * 向消息中间件发送消息(从BMS同步到车辆大数据)
         * @param info 消息
         * @return
         */
        public static int sendMsg(String info) {
            //校验
            if(StringUtils.isEmpty(info) ){
                return CLS_Easy7_Error.ERROR_PARAM;
            }
            try {
                log.debug("CLS_RabbitMQClientSend sendMsg:" + info.getBytes());
                channel.basicPublish("bms_sync_user_auth", "", null, info.getBytes());
                return CLS_Easy7_Error.ERROR_OK;
            } catch (Exception e) {
                log.error("CLS_RabbitMQClientSend method sendMsg error!");
                return CLS_Easy7_Error.ERROR_REQUEST_FAILED;
            }
        }
    
        //关闭连接
        public static void closeConnection(){
            try {
                if (channel != null) {
                    if(channel.isOpen()) {
                        channel.close();
                        channel = null;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            try {
                if (connection != null) {
                    if(connection.isOpen()) {
                        connection.close();
                        connection = null;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }

    3:RabbitMQ的核心帮助类RabbitMQUtil

    package com.tiandy.easy7.its.rabbitmq;
    
    import javax.annotation.Resource;
    
    import com.tiandy.easy7.core.util.Tools;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.ApplicationContext;
    
    import net.sf.json.JSONObject;
    
    import com.rabbitmq.client.ConnectionFactory;
    import com.tiandy.easy7.core.bo.CLS_BO_SystemInfo;
    import com.tiandy.easy7.core.po.TabSystemInfo;
    import com.tiandy.easy7.core.util.MyApplicationContextUtil;
    import com.tiandy.easy7.core.vo.CLS_VO_Result;
    import com.tiandy.easy7.face.rabbitmq.CLS_RabbitMQExchangeRecive;
    
    import easy7.datatype.CLS_Easy7_Types;
    
    import java.net.InetAddress;
    
    //MQ工具类
    public class CLS_RabbitMQUtil {
    
        private static final Logger log = LoggerFactory.getLogger(CLS_RabbitMQUtil.class);
        @Resource(name="boSystemInfo")
        private static CLS_BO_SystemInfo boSystemInfo;  
        //构造方法私有
        private CLS_RabbitMQUtil(){
            
        }
        //得到rabbitmq连接工厂
        public static final ConnectionFactory getRabbitMQConnectionFactory(){
            //平台重启从数据库获取rabbitMq的IP
            getRabbitMQIP();
            //当平台第一次使用时,默认为127.0.0.1
            if(null == CLS_Easy7_Types.rabbitmq_host_for_bj || "".equals(CLS_Easy7_Types.rabbitmq_host_for_bj)){
                CLS_Easy7_Types.rabbitmq_host_for_bj = "127.0.0.1";
            }
            RabbitMQConnectionFactory.factory.setHost(CLS_Easy7_Types.rabbitmq_host_for_bj); //127.0.0.1
            return RabbitMQConnectionFactory.factory;
        }
        
        /**
         * 获取RabbitMQ IP地址
         */
        public static void getRabbitMQIP() {
            ApplicationContext ctx = MyApplicationContextUtil.getContext();
            CLS_BO_SystemInfo boSystemInfo = ctx.getBean(CLS_BO_SystemInfo.class);
            CLS_VO_Result result = null;
            JSONObject json = null;
            JSONObject jsonContent = null;
            JSONObject jsonParam = null;
            int ret;
            String rabbitmq_ip = null;
            try {
                //LS_Easy7_Types.EASY7_ACTIVEMQ_SID ="activemq-server-config-2017728";//activemqid
                result = boSystemInfo.getSystemInfo(CLS_Easy7_Types.EASY7_ACTIVEMQ_SID);
                json = JSONObject.fromObject(result);
                ret = json.getInt("ret");
                if(ret == 0) {
                    String content = json.getString("content");
                    if(content != null && !"".equals(content)) {
                        jsonContent = JSONObject.fromObject(content);
                        String sParam = jsonContent.getString("sParam");
                        if(sParam != null && !"".equals(sParam)) {
                            jsonParam = JSONObject.fromObject(sParam);
                            rabbitmq_ip = jsonParam.getString("rabbitmqIp");
                        }
                    }
                }
            } catch (Exception e1) {
                e1.printStackTrace();
                log.error("rabbitmq id is not exist!");
                return;
            }
            if(jsonParam != null) {
                jsonParam.clear();
                jsonParam = null;
            }
            if(jsonContent != null) {
                jsonContent.clear();
                jsonContent = null;
            }
            if(json != null) {
                json.clear();
                json = null;
            }
            
            if(result == null || rabbitmq_ip == null || "".equals(rabbitmq_ip)) {
                log.error("rabbitmq ip is not exist!");
                return;
            }
              "127.0.0.1" = rabbitmq_ip;
    
            //获取本地服务器IP,作为接受下级权限申请的routinfKey
            try{
                if(null == CLS_Easy7_Types.LOCALHOST_IP || "".equals(CLS_Easy7_Types.LOCALHOST_IP)){
                    CLS_Easy7_Types.LOCALHOST_IP = Tools.getLinuxLocalIp();
                }
            }catch (Exception e){
                log.error("get LinuxIp error" + e);
            }
        }
        
        
        //rabbitmq连接工厂单例
        private static class RabbitMQConnectionFactory{
            private static ConnectionFactory factory = new ConnectionFactory(); 
            static{
                try {
                   //可以通过properties配置文件配置
                    factory.setPort(5673);//MQ端口  
                    factory.setUsername(admin);//MQ用户名  
                    factory.setPassword(123456);//MQ密码  
                    factory.setRequestedHeartbeat(10);//设置心跳 (秒)
                    factory.setAutomaticRecoveryEnabled(true);//自动恢复连接
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    4:获取需要发送MQ的消息

    public CLS_VO_Ignite_Message deleteHostCamera(){
    //定义一个空的list模拟数据,具体实际业务中里边肯定是有数据的(需要发现车辆的数据消息) List
    <CLS_VO_Ignite_Camera> cameralist = new ArrayList<CLS_VO_Ignite_Camera>(); //定义拼接向Rabbit发送的消息体 (json格式的数据) CLS_VO_Ignite_Message message = new CLS_VO_Ignite_Message(CLS_Easy7_Types.MESSAGE_SOURCE_BMS,CLS_Easy7_Types.MESSAGE_TYPE_CAMERA, CLS_Easy7_Types.MESSAGE_OPERATE_DEL,null,null,null,cameralist,null); return message; }

    5:发送消息

    public void sendRabbitMQMessage(){
      //如发送的消息:
    //{"id":"f6560157-24ae-475b-9736-34072684b008","reviewInfo":"","status":1,"ids":[],"currentUserId":"admin","reviewUser":"admin"}
    CLS_VO_Ignite_Message message
    =deleteHostCamera(); //消息为空时不发送消息 if(null != message.getSource() && !"".equals(message.getSource())){ int sendResult = CLS_RabbitMQClientSend.sendMsg(JSONObject.fromObject(message).toString()); //同步消息失败,错误代码使用-40,数据库操作回滚 if(sendResult != 0){ TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); //事务回滚 result.setRet( -40);//请求失败 return result; } } }

    6:接收MQ中发送的消息

      1 package com.tiandy.vbs.ics.rabbitMQ;
      2 import java.io.IOException;
      3 import java.util.HashMap;
      4 import java.util.Map;
      5 import java.util.concurrent.ArrayBlockingQueue;
      6 import java.util.concurrent.Future;
      7 import java.util.concurrent.ThreadPoolExecutor;
      8 import java.util.concurrent.TimeUnit;
      9 
     10 import javax.annotation.PostConstruct;
     11 
     12 import org.springframework.beans.factory.annotation.Autowired;
     13 import org.springframework.stereotype.Component;
     14 
     15 import com.rabbitmq.client.AMQP;
     16 import com.rabbitmq.client.BuiltinExchangeType;
     17 import com.rabbitmq.client.Channel;
     18 import com.rabbitmq.client.Connection;
     19 import com.rabbitmq.client.ConnectionFactory;
     20 import com.rabbitmq.client.Consumer;
     21 import com.rabbitmq.client.DefaultConsumer;
     22 import com.rabbitmq.client.Envelope;
     23 import com.tiandy.vbs.adapter.rabbitmq.CLS_ConsumerThreadPool;
     24 import com.tiandy.vbs.adapter.rabbitmq.CLS_RabbitMQQueueRecive;
     25 import com.tiandy.vbs.adapter.rabbitmq.CLS_RabbitMQUtil;
     26 import com.tiandy.vbs.common.util.CLS_VBS_Types;
     27 import com.tiandy.vbs.common.util.InterfaceLogger;
     28 import com.tiandy.vbs.ics.bo.CLS_BO_DispositionAdapter;
     29 import com.tiandy.vbs.ics.vo.CLS_VO_MQMessage;
     30 
     31 @Component
     32 public class CLS_QueueDispositionRecive extends CLS_RabbitMQQueueRecive{
     33     //rabbitmq连接
     34     private static Connection connection = null;
     35     //rabbitmq通道
     36     private static Channel channel = null ;
     37     //连接状态标识
     38     public boolean connectStatus = false;
     39     //消费者线程池
     40     public static CLS_ConsumerThreadPool consumerThreadPool = new CLS_ConsumerThreadPool(CLS_VBS_Types.corePoolSize,CLS_VBS_Types.maximumPoolSize,
     41             CLS_VBS_Types.keepAliveTime, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(CLS_VBS_Types.workQueue), new ThreadPoolExecutor.CallerRunsPolicy());
     42     @Autowired
     43     private CLS_BO_DispositionAdapter boAdapter;
     44     public static CLS_QueueDispositionRecive queueRecive;    
     45     @PostConstruct
     46     public void init() {
     47         queueRecive = this;
     48     }
     49     public Connection getConnection() {
     50         return connection;
     51     }
     52     public Channel getChannel() {
     53         return channel;
     54     }
     55     public void initialize(){
     56         try {
     57             //连接工厂
     58             ConnectionFactory factory= CLS_RabbitMQUtil.getRabbitMQConnectionFactory();
     59             //关闭连接与通道
     60             closeConnection();
     61             connection = factory.newConnection();  
     62             channel = connection.createChannel();  
     63             syncConsumer();
     64             connectStatus = true ;
     65         } catch (Exception e) {
     66             connectStatus = false ;
     67             e.printStackTrace();
     68             InterfaceLogger.error("CLS_RabbitMQQueueRecive method initialize:"+e.getMessage(),e);
     69         }
     70     }
    //核心同步接收消息
    71 public void syncConsumer()throws Exception{ 72 Map<String, Object> args = new HashMap<String, Object>(); 73 args.put("x-max-length", 100000); 74 args.put("x-message-ttl",CLS_VBS_Types.x_message_ttl); 75 76 channel.queueDeclare(CLS_VBS_Types.rabbitmq_queue_alarm_record,true,false,false,args); 77 //声明交换器 78 channel.exchangeDeclare(CLS_VBS_Types.rabbitmq_exchange_name, BuiltinExchangeType.FANOUT ,true); 79 channel.queueBind(CLS_VBS_Types.rabbitmq_queue_alarm_record, CLS_VBS_Types.rabbitmq_exchange_name, ""); 80 //消费者 81 Consumer consumer = new DefaultConsumer(channel) { 82 @Override 83 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 84 //接收到的消息 85 String message = new String(body, "UTF-8"); 86 InterfaceLogger.debug("recive message:"+message); 87 if(!"".equals(message)){ 88 CLS_VO_MQMessage mqVo = new CLS_VO_MQMessage(); 89 mqVo.setMessage(message); 90 mqVo.setEnvelope(envelope); 91 CLS_DispositionConsumer worker = new CLS_DispositionConsumer(mqVo,queueRecive.boAdapter,channel); 92 consumerThreadPool.submit(worker);//多线程处理数据 93 }else { 94 channel.basicAck(envelope.getDeliveryTag(), false); 95 } 96 } 97 }; 98 //消息手动确认 99 channel.basicConsume(CLS_VBS_Types.rabbitmq_queue_alarm_record, false, consumer); 100 } 101 //关闭连接 102 public void closeConnection(){ 103 try { 104 if (channel != null) { 105 if(channel.isOpen()) { 106 channel.close(); 107 channel = null; 108 } 109 } 110 } catch (Exception e) { 111 InterfaceLogger.error("CLS_RabbitMQExchangeRecive closeChannel error " + e); 112 e.printStackTrace(); 113 } 114 try { 115 if (connection != null) { 116 if(connection.isOpen()) { 117 connection.close(); 118 connection = null; 119 } 120 } 121 } catch (Exception e) { 122 InterfaceLogger.error("CLS_RabbitMQExchangeRecive closeConnection error " + e); 123 e.printStackTrace(); 124 } 125 } 126 }

    7:线程池处理消息类

     1 package com.tiandy.vbs.adapter.rabbitmq;
     2 
     3 import java.util.concurrent.ArrayBlockingQueue;
     4 import java.util.concurrent.ThreadPoolExecutor;
     5 import java.util.concurrent.TimeUnit;
     6 import org.springframework.stereotype.Component;
     7 import com.tiandy.vbs.common.util.CLS_VBS_Types;
     8 @Component
     9 public abstract class CLS_RabbitMQQueueRecive {
    10     
    11     //消费者线程池
    12     public static CLS_ConsumerThreadPool consumerThreadPool = new CLS_ConsumerThreadPool(CLS_VBS_Types.corePoolSize,CLS_VBS_Types.maximumPoolSize,
    13             CLS_VBS_Types.keepAliveTime, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(CLS_VBS_Types.workQueue), new ThreadPoolExecutor.CallerRunsPolicy());    
    14     //初始化连接
    15     public abstract void initialize();
    16     //消费者
    17     public abstract void syncConsumer()throws Exception;
    18     //关闭连接
    19     public abstract void closeConnection();
    20 }

    其他应用或平台接收到MQ中的消息后,解析JSON数据,转换为对于的数据实体,将数据添加到相应的库中表中即可,这样就完成了消息的发送和数据的同步!

    至此,发现消息得过程就完成了!

  • 相关阅读:
    关于JSONP
    使用stylelint对CSS/Sass做代码审查
    关于input的file框onchange事件触发一次失效的新的解决方法
    HTML5 之 FileReader(图片上传)
    document.domain
    window.hostory(浏览器的历史记录)
    事件DOMContentLoaded和load的区别
    JavaScript中---作用域
    关于repaint(重绘)和reflow( 回流)
    bootstrap兼容性问题
  • 原文地址:https://www.cnblogs.com/zhaosq/p/13424903.html
Copyright © 2011-2022 走看看