1.业务场景:运用spring的消息接收,难以确定连接是否正常。摄像头抓拍消息,发送给订阅者。生产者每隔10s会发送一次心跳消息,以便订阅者确认保持连接正常。
2.需求:有一次日志消息显示30分钟没有任何日志,导致生产数据丢失。为有效减缓这种情况发生,用一个线程检测消费者多久没有收到心跳信息。如果超时没有收到心跳消息,执行重新订阅。
3.直接上代码:
public class Test { private int connectCount = 0; private static TestThread tester; class TestThread extends Thread { private int index = 0; private boolean running = true; @Override public void run() { while (running) { logger.info("秒:" + ++index); try { Thread.sleep(1000); if(index >= 20) { connectCount ++; logger.info("-------持续20秒没有保持连接!!!!"); if(connectCount<=10) { logger.info("-------等待10秒即将准备重新连接!"); Thread.sleep(10000); subscribe(); }else { connectCount = 0; logger.info("-------尝试连接了十次失败!"); } tester.setRunning(false); } } catch (InterruptedException e) { e.printStackTrace(); break; } } logger.info("-------run方法结束!"); } public void setRunning(boolean running) { this.running = running; } public void setIndex(int index) { this.index = index; } } //消费者订阅方法 public void subscribe(){ //检测线程启动 tester = new TestThread(); tester.start(); //订阅消费者...... } //消费者接收消息消费 public void messageListener(Message message){ //这里会处理接收的数据 if(){//如果是正常数据进行处理 logger.info("收到消息:"+message); }else{//如果是心跳消息 logger.info("收到反馈消息,不为数据消息,不处理!"); //重设线程index tester.setIndex(0); } } //启动订阅方法,开始测试...... }