zoukankan      html  css  js  c++  java
  • 一个简单的消息队列的实现(支持延时消息,支持持久化,保证唯一消费)

    主要的消息管理者对象:

    package com.rynk.mugua.trading.biz.service.impl;
    
    import java.util.concurrent.DelayQueue;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import com.rynk.commons.entity.QueueMessage;
    import com.rynk.mugua.trading.biz.commons.RedisKeyResolver;
    import com.rynk.mugua.trading.biz.commons.lock.DistributedLockHandler;
    import com.rynk.mugua.trading.biz.eth.DelayedTack;
    
    import lombok.extern.slf4j.Slf4j;
    /**
     * 延时消息管理员
     * @author ZHANGYUKUNUP
     *
     */
    @Component
    @Slf4j
    public class QueueManger {
    	
    	MessagePersistent messagePersistent;
    	
    	/**
    	 * 延时消息队列
    	 */
    	private DelayQueue<DelayedTack> dQueue = new DelayQueue<>();
    	
    	/**
    	 * 消息任务处理线程
    	 */
    	private Thread taskThread;
    	
    	@Autowired
    	DistributedLockHandler lock;
    	
    	
    	public QueueManger() {
    		taskThread = new TaskThread();
    		taskThread.start();
    	}
    	
    	/**
    	 * 任务线程
    	 * @author ZHANGYUKUNUP
    	 *
    	 */
    	class TaskThread extends Thread{
    		
    		@Override
    		public void run() {
    			while (true) {
    				try {
    					DelayedTack delayedTack = dQueue.take();
    					QueueMessage queueMessage = delayedTack.getQueueMessage();
    					if( queueMessage == null ) {
    						return ;
    					}
    					
    					//简单的加个锁保证消息不被重复消费(需要保证解锁前 数据被提交到数据库,否者会出同步问题 ,也就是说不能有更加大的 事务范围 包裹当前方法 )
    					if(  lock.tryLock(  RedisKeyResolver.getMsgrKey( queueMessage.getId() )    ) ) {
    						
    						//如果这个消息被正常消费,那么久标记消费成功,如果异常消费,那么久重试这个消息
    						try {
    							if( QueueManger.this.messageDispense(delayedTack.getQueueMessage())  ) {
    								messagePersistent.succeed( queueMessage );
    							}else {
    								QueueManger.this.reTry( queueMessage  );
    							}
    						}catch (Exception e) {
    							e.printStackTrace();
    							QueueManger.this.reTry(queueMessage);
    						}finally {
    							lock.unLock( RedisKeyResolver.getMsgrKey( queueMessage.getId() )    );
    						}
    					}
    				} catch (Exception e) {
    					e.printStackTrace();
    				}
    			}
    		}
    		
    	}
    	
    	
    	
    	
    	/**
    	 * 重试
    	 * @param queueMessage
    	 */
    	protected void reTry(QueueMessage queueMessage) {
    		messagePersistent.reTry(queueMessage);
    	}
    
    
    
    	/**
    	 * 分发消息
    	 * @param queueMessage
    	 */
    	protected boolean messageDispense(QueueMessage queueMessage) {
    		
    		return messagePersistent.consume(queueMessage);
    	}
    
    	/**
    	 * 添加一个延时消息
    	 * @param delayedTack
    	 */
    	public void put(DelayedTack delayedTack) {
    		dQueue.put(delayedTack);
    	}
    	
    	/**
    	 * 查询未处理的延时消息数量
    	 * @return
    	 */
    	public int size() {
    		return dQueue.size();
    	}
    	
    	/**
    	 * 消息处理线程存活状态
    	 * @return
    	 */
    	public boolean isAlive() {
    		return taskThread.isAlive();
    	}
    	
    
    }
    

     

    消息对象:

    package com.rynk.commons.entity;
    
    import java.util.Date;
    
    import org.springframework.data.mongodb.core.mapping.Document;
    
    import com.rynk.commons.entity.em.QueueMessageType;
    import com.rynk.commons.entity.em.TransferRecordStatus;
    import com.rynk.commons.util.SnGeneratorUtil;
    
    import lombok.Data;
    
    @Data
    @Document(collection = "mg_queue_message")
    public class QueueMessage extends BaseEntity {
    
    	/**
    	 * 唤醒时间
    	 */
    	private Date awakenDate;
    	
    	
    	/**
    	 * 处理状态
    	 */
    	private TransferRecordStatus transferRecordStatus;
    	
    	
    	/**
    	 * 消息体
    	 */
    	private String  body;
    	
    	/**
    	 * 消息体类型
    	 */
    	private QueueMessageType type;
    	
    
    	/**
    	 * 重试次数
    	 */
    	private Integer tryTimes;
    	
    	
    	/**
    	 * 最后一次核对时间
    	 */
    	private Date lastCheckDate;
    	
    
    	
    	
    	/**
    	 * 
    	 * @param body 消息体来源类型
    	 * @param type 消息类型
    	 * @param delayed 延时
    	 * @return
    	 */
    	public static  QueueMessage newInstance( String  body , QueueMessageType type , long delayed ) {
    		QueueMessage item = new QueueMessage();
    		item.setId( SnGeneratorUtil.getId().toString() );
    		item.setCreateDate(  new Date() );
    		item.setDr(false);
    		
    		item.setTransferRecordStatus( TransferRecordStatus.WAIT );
    		item.setTryTimes(1);
    		item.setBody(body);
    		item.setType(type);
    		item.setAwakenDate( new Date( System.currentTimeMillis()+delayed ));
    		item.setLastCheckDate( item.getAwakenDate() );
    		
    		return item;
    	}
    
    	
    }
    

      

    基于redis 的 分布式锁对象:

    package com.rynk.mugua.trading.biz.commons.lock;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.core.ValueOperations;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StringUtils;
    
    import javax.annotation.Resource;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 分布式锁
     *
     * @author ZHANGYUKUN
     *
     */
    @Component
    public class DistributedLockHandler {
    
    	private static final Logger logger = LoggerFactory.getLogger(DistributedLockHandler.class);
    
    	/**
    	 * 最大持有锁的时间(毫秒)
    	 */
    	private final static long LOCK_EXPIRE = 30 * 1000L;
    
    	/**
    	 * 尝试获取锁的时间间隔(毫秒)
    	 */
    	private final static long LOCK_TRY_INTERVAL = 30L;
    
    	/**
    	 * 获取锁最大等待时间( 毫秒 )
    	 */
    	private final static long LOCK_TRY_TIMEOUT = 20 * 1000L;
    
    	@Resource// (name = "customRedisTemplate")
    	private RedisTemplate<String, String> template;
    
    	/**
    	 * 尝试获取 分布式锁
    	 *
    	 * @param lockKey
    	 *            锁名
    	 * @return true 得到了锁 ,false 获取锁失败
    	 */
    	public boolean tryLock(String lockKey) {
    		return getLock(lockKey, LOCK_TRY_TIMEOUT, LOCK_TRY_INTERVAL, LOCK_EXPIRE);
    	}
    
    	/**
    	 * 尝试获取 分布式锁(不自动释放锁)
    	 *
    	 * @param lockKey
    	 *            锁名
    	 * @return true 得到了锁 ,false 获取锁失败
    	 */
    	public boolean tryLockNotAutoRelease(String lockKey) {
    		return getLock(lockKey, LOCK_TRY_TIMEOUT, LOCK_TRY_INTERVAL, -1);
    	}
    
    	/**
    	 * 尝试获取 分布式锁
    	 *
    	 * @param lockKey
    	 *            锁名
    	 * @param timeout
    	 *            获取锁最大等待时间
    	 * @return true 得到了锁 ,false 获取锁失败
    	 */
    	public boolean tryLock(String lockKey, long timeout) {
    		return getLock(lockKey, timeout, LOCK_TRY_INTERVAL, LOCK_EXPIRE);
    	}
    
    	/**
    	 * 尝试获取 分布式锁(不自动释放锁)
    	 *
    	 * @param lockKey
    	 *            锁名
    	 * @param timeout
    	 *            获取锁最大等待时间
    	 * @return true 得到了锁 ,false 获取锁失败
    	 */
    	public boolean tryLockNotAutoRelease(String lockKey, long timeout) {
    		return getLock(lockKey, timeout, LOCK_TRY_INTERVAL, -1);
    	}
    
    	/**
    	 * 尝试获取 分布式锁
    	 *
    	 * @param lockKey
    	 *            锁名
    	 * @param timeout
    	 *            获取锁最大等待时间
    	 * @param tryInterval
    	 *            获取锁尝试 时间间隔
    	 * @return true 得到了锁 ,false 获取锁失败
    	 */
    	public boolean tryLock(String lockKey, long timeout, long tryInterval) {
    		return getLock(lockKey, timeout, tryInterval, LOCK_EXPIRE);
    	}
    
    	/**
    	 * 尝试获取 分布式锁(不释放锁)
    	 *
    	 * @param lockKey
    	 *            锁名
    	 * @param timeout
    	 *            获取锁最大等待时间
    	 * @param tryInterval
    	 *            获取锁尝试 时间间隔
    	 * @return true 得到了锁 ,false 获取锁失败
    	 */
    	public boolean tryLockNotAutoRelease(String lockKey, long timeout, long tryInterval) {
    		return getLock(lockKey, timeout, tryInterval, -1);
    	}
    
    	/**
    	 * 尝试获取 分布式锁
    	 *
    	 * @param lockKey
    	 *            锁名
    	 * @param timeout
    	 *            获取锁最大等待时间
    	 * @param tryInterval
    	 *            获取锁尝试 时间间隔
    	 * @param lockExpireTime
    	 *            锁最大持有时间
    	 * @return true 得到了锁 ,false 获取锁失败
    	 */
    	public boolean tryLock(String lockKey, long timeout, long tryInterval, long lockExpireTime) {
    		return getLock(lockKey, timeout, tryInterval, lockExpireTime);
    	}
    
    	/**
    	 * 获取分布式锁
    	 *
    	 * @param lockKey
    	 *            锁名
    	 * @param timeout
    	 *            获取锁最大等待时间
    	 * @param tryInterval
    	 *            获取锁尝试 时间间隔
    	 * @param lockExpireTime
    	 *            锁最大持有时间
    	 * @return true 得到了锁 ,false 获取锁失败
    	 */
    	public boolean getLock(String lockKey, long timeout, long tryInterval, long lockExpireTime) {
    		try {
    			if (StringUtils.isEmpty(lockKey)) {
    				return false;
    			}
    			long startTime = System.currentTimeMillis();
    			do {
    				ValueOperations<String, String> ops = template.opsForValue();
    				SimpleDateFormat sd = new SimpleDateFormat("yyyy-mm-dd HH:mm:ss");
    				if (ops.setIfAbsent(lockKey, sd.format(new Date())  )) {
    					if (lockExpireTime > 0) {
    						template.expire(lockKey, lockExpireTime, TimeUnit.MILLISECONDS);
    					}
    					return true;
    				}
    				Thread.sleep(tryInterval);
    			} while (System.currentTimeMillis() - startTime < timeout);
    		} catch (InterruptedException e) {
    			logger.error(e.getMessage());
    			return false;
    		}
    		return false;
    	}
    
    	/**
    	 * 释放锁
    	 *
    	 * @param lockKey
    	 */
    	public void unLock(String lockKey) {
    		if (!StringUtils.isEmpty(lockKey)) {
    			template.delete(lockKey);
    		}
    	}
    
    }
    

      

    延时任务对象: 用来分装延时消息的

    package com.rynk.mugua.trading.biz.eth;
    
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    import com.rynk.commons.entity.QueueMessage;
    import com.rynk.mugua.trading.biz.mqMessage.ChainMessageDelayTimeLevel;
    
    /**
     * 延时任务
     * @author zhangyukun
     *
     */
    public class DelayedTack implements Delayed{
    	/**
    	 *  执行的时间
    	 */
    	Long  runTime;
    	
    	/**
    	 * 消息对象
    	 */
    	QueueMessage queueMessage;
    	
    	
    	public QueueMessage getQueueMessage() {
    		return queueMessage;
    	}
    
    	public void setQueueMessage(QueueMessage queueMessage) {
    		this.queueMessage = queueMessage;
    	}
    
    
    	/**
    	 * 
    	 * @param delay 延时毫秒数
    	 * @param queueMessage 消息体
    	 */
    	public DelayedTack( QueueMessage queueMessage  ) {
    		
    		if( queueMessage.getTryTimes() == 1  ) {
    			this.runTime = queueMessage.getAwakenDate().getTime();
    		}else {
    			this.runTime =System.currentTimeMillis() + ChainMessageDelayTimeLevel.getDelayTimeLevel( queueMessage.getTryTimes() )*1000 ;
    		}
    		
    		this.queueMessage = queueMessage;
    	}
    	
    	@Override
    	public long getDelay(TimeUnit unit) {
    		return unit.convert( runTime - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
    	}
    
    	@Override
    	public int compareTo(Delayed o) {
    		return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS));
    	}
    	
    
    
    }
    

      

    持久化消息的对象: 三个 方式 按照自己的实现做就是了

    package com.rynk.mugua.trading.biz.service.impl;
    
    import com.rynk.commons.entity.QueueMessage;
    
    public class MessagePersistent {
    	
    	/**
    	 * 消费这个消息要处理的业务逻辑
    	 * @param queueMessage
    	 * @return
    	 */
    	public boolean consume(QueueMessage queueMessage) {
    		return false;
    	}
    	
    	/**
    	 * 标记这个消息已经被正常消费
    	 * @param queueMessage
    	 */
    	public void succeed(QueueMessage queueMessage) {
    		
    	}
    	
    	/**
    	 * 重试消息(标记数据库的状态,然后把它重新放到延时队列中)
    	 * @param queueMessage
    	 */
    	public void reTry(QueueMessage queueMessage) {
    		
    		
    	}
    
    }
    

      

    备注: 上面的 消息队列,基于Java的 延时消息,带有持久,保证唯一消费,但是有一个明显的 问题 ,所有的消息都在内存,如果有1亿条消息,内存肯定是要爆炸的 ,

    如果要求 消息负载大,那么我们可以考虑 吧部分消息 放在 内存。

      1 同上 所有消息都要持久化,保证不丢失,

      2 只有最近一段时间需要消费的消息才加入到内存( 比如一分钟 )

      3 加入消息的时候 ,判断这个消息的执行执行时间,如果是 1 分钟以内,持久化以后,直接放入内存队列, 1 分钟以外,直接 持久化,不放入内存。

      4 每隔 一段时间 ( 1 分钟   ),载入一个,并且可以指定载入条数。

      5 如果是  多节点,考虑分布式环境的消息消费,可以 通过 消息Id 模  节点编号,来指定分配消息(  数据分片原理, 之歌分片不能随意改动,改动为题也不大,这是知识消费,避免 重复消费就是了,但是 如果一个节点 挂了,可能 部分消息不消费 )

  • 相关阅读:
    js怎么通过逗号将string转换成数组
    设置mysql数据库为只读
    python 关于django 2.X from django.contrib.auth.views import login
    python Django2.X,报错 ‘learning_logs ’is not a registered namespace,如何解决?
    python django2.x报错No module named 'django.core.urlresolvers'
    python Django2.0如何配置urls文件
    VMware vSphere 组件和功能
    VMware vSphere Client的简单使用教程
    python 逻辑运算 ‘and’ ,'or' 在实战中的作用,代替if语句。
    python_urllib2:urlerror和httperror
  • 原文地址:https://www.cnblogs.com/cxygg/p/12162878.html
Copyright © 2011-2022 走看看