zoukankan      html  css  js  c++  java
  • RabbitMQ生产者消费者

    package com.ra.car.rabbitMQ;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.UUID;
    import java.util.concurrent.TimeoutException;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.alibaba.fastjson.JSONObject;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.ConsumerCancelledException;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.ShutdownSignalException;
    
    /**
     * 生产者,用来发送拍照指令
     * 
     * 
     */
    public class RabbitMQProducer {
        protected static final Logger logger = LoggerFactory.getLogger(RabbitMQProducer.class);
    
        private final static String QUEUE_NAME = "44b2fe8a-4d70-4a18-b75f-91a6f170bd16"; // 上送队列
    
        private String message;
    
        public RabbitMQProducer() {
    
        }
    
        public RabbitMQProducer(String message) {
            this.message = message;
        }
    
        public void sendMessage(){
            String replyQueueName = null; // 返回队列名
    
            ConnectionFactory connFactory = null;
            Connection conn = null;
            Channel channel = null;
            try {
                connFactory = new ConnectionFactory();
                connFactory.setHost("58.211.54.147");
                connFactory.setUsername("customer");
                connFactory.setPassword("123456");
                connFactory.setPort(5672);
                conn = connFactory.newConnection();
                channel = conn.createChannel();
                QueueingConsumer consumer = new QueueingConsumer(channel);
    
                Map<String, Object> param = new HashMap<String, Object>();
                param.put("x-message-ttl", 600000);
                param.put("x-expires", 86400000);
    
                // 返回队列
                replyQueueName = channel.queueDeclare().getQueue();
                channel.basicConsume(replyQueueName, true, consumer);
                String corrId = UUID.randomUUID().toString(); // 用来表示返回队列结果的id,唯一
                BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();
    
                channel.queueDeclare(QUEUE_NAME, true, false, false, param);
                channel.basicPublish("", QUEUE_NAME, props, message.getBytes());
                logger.info("producer has published: "" + message + """);
            } catch (IOException ioe) {
                ioe.printStackTrace();
            } catch (TimeoutException toe) {
                toe.printStackTrace();
            } catch (ShutdownSignalException e) {
                e.printStackTrace();
            } catch (ConsumerCancelledException e) {
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (channel != null)
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                if (conn != null)
                    try {
                        conn.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
            }
        }
    
        public static void main(String[] args) {
            JSONObject json = new JSONObject();
            json.put("msgId", "8801");
            json.put("gpsNo", "001709270202");
            json.put("channelId", "1");// 1,2 前置 后置
            json.put("serialNo", "1133");
            //RabbitMQProducer rb = new RabbitMQProducer(json.toString());
            //Thread t = new Thread(rb);
            //t.start();
            //String str = "7e0805000901170427881200090003000001b6b5833313";
            //System.out.println(str.substring(26, 36)+"**"+str.substring(36,str.length()-2));
        }
    }
    package com.ra.car.rabbitMQ;
    
    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    import com.ra.truck.model.MessagePackage;
    import com.ra.truck.service.MessagePackegerService;
    
    import org.apache.commons.lang3.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.web.context.ContextLoader;
    
    import com.alibaba.fastjson.JSON;
    import com.ra.car.utils.StringToT;
    import com.ra.common.util.UuidUtil;
    import com.ra.truck.service.DataCallBackService;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.QueueingConsumer.Delivery;
    
    /**
     * 消费者
     * 
     * 
     */
    public class RabbitMQCustomer implements Runnable {
        protected static final Logger logger = LoggerFactory
                .getLogger(RabbitMQCustomer.class);
        private final static String QUEUE_NAME = "adb65b08-a27d-42b0-b4ac-ff10422ac213";
    
        private ConnectionFactory connFactory;
        private Connection conn;
        private Channel channel;
        private Delivery delivery;
        private QueueingConsumer consumer;
        private SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
        private MessagePackegerService messagepackegerService = (MessagePackegerService)ContextLoader
                .getCurrentWebApplicationContext().getBean("MessagePackegerService");
        private DataCallBackService dataCallBackService = (DataCallBackService) ContextLoader
                .getCurrentWebApplicationContext().getBean("DataCallBackService");
    
        /**
         * 分开try...catch...,1.出现连接异常时,中断连接;2.出现消费数据异常时,继续去消费,不中断连接
         */
        @Override
        public void run() {
            try {
                connFactory = new ConnectionFactory();
                connFactory.setHost("58.211.54.147");
                connFactory.setUsername("customer");
                connFactory.setPassword("123456");
                connFactory.setPort(5672);
                conn = connFactory.newConnection();
                channel = conn.createChannel();
    
                Map<String, Object> param = new HashMap<String, Object>();
                param.put("x-message-ttl", 600000);
                param.put("x-expires", 86400000);
    
                channel.queueDeclare(QUEUE_NAME, true, false, false, param);
    
                logger.info("listening for event message...");
    
                consumer = new QueueingConsumer(channel);
                channel.basicConsume(QUEUE_NAME, true, consumer);
    
                while (true) {
                    try {
                        Thread.sleep(100);
                        delivery = consumer.nextDelivery();
                        // BasicProperties props = delivery.getProperties();
                        /*
                         * BasicProperties reply_props = new
                         * BasicProperties.Builder()
                         * .correlationId(props.getCorrelationId()).build();
                         */
                        // String msg = new String(delivery.getBody());
                        logger.info("*****当前时间:" + df.format(new Date()) + "*****");
                        logger.info("*****原始数据为:" + JSON.toJSONString(delivery.getBody()) + "*****");
                        String msg = MQUtils.bytes2Hex(delivery.getBody());
                        logger.info("receive msg:" + msg);
    
                        //RdDeviceCallBackDataDomain backDataDomain = new RdDeviceCallBackDataDomain();
                        //backDataDomain.setId(String.valueOf(System.currentTimeMillis()));
                        if (StringUtils.isNotBlank(msg)) {
                            MessagePackage msgPackeger = new MessagePackage();
                            if (msg.length() > 28) {
                                msg = msg.substring(0, msg.length()-2).replaceAll("7d02", "7e").replaceAll("7d01", "7d");//去掉标识位,转义还原
                                msgPackeger.setId(UuidUtil.create());
                                String messageId = msg.substring(2, 6);
                                msgPackeger.setMessageId(messageId);//消息Id
                                msgPackeger.setMessageProperty(msg.substring(6, 10));//消息体属性
                                msgPackeger.setImei(msg.substring(10, 22));//imi 终端手机号
                                msgPackeger.setSerialNumber(msg.substring(22, 26));//流水号
                                msgPackeger.setCheckCode(msg.substring(msg.length()-2, msg.length()));//校验码
                                if (StringUtils.isNotBlank(messageId) && messageId.equals("0801")) {
                                    String msgBodyProperties = msg.substring(6, 10);// 消息体属性
                                    String msgBodyProperties2 = StringToT.hexString2binaryString(msgBodyProperties);//消息属性二进制格式
                                    String isSubpackage = msgBodyProperties2.substring(2, 3);   //是否分
                                    if ("1".equals(isSubpackage)) {
    
                                        String isSplitNumber = msg.substring(26, 30); // 分报数
                                        String isSplit = msg.substring(30, 34);//消息流水号
                                        msgPackeger.setMessageSplit(isSplitNumber + isSplit);
                                        if("0001".equals(isSplit)) {
                                            String mediaId = msg.substring(34, 42);
    
                                            msgPackeger.setMeidiaId(mediaId);
                                            msgPackeger.setMessageBody(msg.substring(42, msg.length() - 2));
                                        }else{
                                            msgPackeger.setMessageSplit("0001");
                                            List<MessagePackage> messagepackegerList=messagepackegerService.selectMessagepackeger(msgPackeger);
                                            String meidiaId=messagepackegerList.get(0).getMeidiaId();
                                            String mediabody = messagepackegerList.get(0).getMessageBody().substring(0,64);
                                            msgPackeger.setMessageSplit(isSplitNumber + isSplit);
                                            msgPackeger.setMeidiaId(meidiaId);
                                            msgPackeger.setMessageBody(mediabody+msg.substring(34, msg.length()- 2));
                                        }
                                    } else {
                                        String mediaId = msg.substring(26, 34);
                                        msgPackeger.setMeidiaId(mediaId);
                                        msgPackeger.setMessageBody(msg.substring(34, msg.length() - 2));
                                    }
                                }else if(StringUtils.isNotBlank(messageId) && messageId.equals("0805")){
                                     msgPackeger.setMessageBody(msg.substring(26, 36));
                                     msgPackeger.setMeidiaId(msg.substring(36, msg.length()-2));
                                }else{
                                   msgPackeger.setMessageBody(msg.substring(26, msg.length() - 2));//消息体
                                }
                                  //保存校验码
                                  msgPackeger.setCheckCode((msg.substring(msg.length()- 2,msg.length())));
                                    dataCallBackService.insertDeviceRawDataOfOne(msgPackeger);
                                }
                            }
                        // 不用设置返回
                        /*
                         * String retMsg = "ok, give you reply:" + new
                         * String(msg.getBytes(), "utf-8");
                         * logger.info("Consumer中的返回队列名" + props.getReplyTo());
                         * channel.basicPublish("", QUEUE_NAME, reply_props,
                         * retMsg.getBytes());
                         */
                    } catch (Exception e) {
                        logger.error("循环消费数据异常.....", e);
                    }
                }
            } catch (Exception e) {
                logger.error("MQ connection error.....", e);
            } finally {
                try {
                    if (channel != null) {
                        logger.info("channel.close");
                        channel.close();
                    }
                    if (conn != null) {
                        logger.info("conn.close");
                        conn.close();
                    }
                } catch (IOException e) {
                    logger.info("IOException");
                } catch (TimeoutException e) {
                    logger.info("TimeoutException");
                }
            }
    
        }
    }
  • 相关阅读:
    C++中char*,String,int,CString间转换
    获取本地MAC地址和多IP
    子窗口
    linux记录键盘
    curses和窗口
    使用curses函数写的hello world 程序
    Java 复习笔记
    Ubuntu apt install 下载软件很慢的解决办法
    Ubuntu python多个版本管理
    VMware下的Ubuntu16设置连接主机网络,设置主机下可以通过xshell访问 VMware下的Ubuntu
  • 原文地址:https://www.cnblogs.com/lazyInsects/p/8000328.html
Copyright © 2011-2022 走看看