序言
什么高TPS?QPS,其实很多人都知道,还有人说大数据,大流量这些关键词夜以继日的出现在我们眼前;
针对高TPS,QPS这些词汇还有一个次可能比较陌生那就是CCU,tps,qps接受满天飞,CCU在游戏服务端出现比较多,
一个运营(SP)如果问研发(CP)你们游戏承载是多少?通常他们想知道,你们能承载多少玩家在线,并且能承载每个玩家在一秒内有多少个操作;
通常,MMO的RPG类游戏,FPS类游戏,对玩家同时操作要求都相对较高,比如团战,这时候玩家的操作是极具频繁的;
在游戏界很多人都知道传统的页游,或者备份手游,在服务器端,设计是就是以不同的地图切换来做整个世界场景,
每一个场景通过传送门切换到下一章地图;在传统游戏做法就是每一张地图就是一个线程,这个在游戏界广为流传的代码《秦美人》模式;
这样做的好处就是每一个地图都是单独的线程,自己管理自己范围的事情,不会冲突,但是理论终究是理论,
实际在线运营情况就是,某些地图比如《主城》《副本入口》《活动入口》这些地方聚集了大量的玩家,在某些低等级地图或者没什么任务和装逼产出的地图玩家少的可怜甚至没有;
在传统游戏,比如最早的盛大代理的《冒险岛》业内家喻户晓的《秦美人》代码都有分线这么一说就是为了解决在一张地图人太多,一个线程处理不了的问题;
这种传统模式就是一句话,忙得忙死,闲的闲死,一句套用现在皮友的一句话,涝的涝死,旱的旱死;
那么应运而生的是什么
没错就是我们今天要讲的环形排队队列;可能听起来有点绕口,
其目的是什么呢?通过队列模型,达到线程恭喜,不再有专有线程,减少线程数量,线程做什么事情由队列说了算;
我们通常队列是这样的,先进先出,
排队队列是什么情况呢?
就是队列里面的某一项,当从队列里面获取到这一项的时候,发现这一项本身不是一个任务而是一个队列;
然后取出这一项队列里面的第一项来执行,
一般来讲我们需要的队列基本也就是两层就够了,
当然你们如果需要三层,或者更多,你们可以稍加改动我们后面的代码;
翠花上代码
队列枚举
1 package com.ty.test.queue; 2 3 /** 4 * 队列key值 5 * 6 * @author: Troy.Chen(失足程序员, 15388152619) 7 * @create: 2021-04-06 11:19 8 **/ 9 public enum QueueKey { 10 /** 11 * 默认队列是不区分,顺序执行 12 */ 13 Default, 14 /** 15 * 登录任务处理 16 */ 17 Login, 18 /** 19 * 联盟,工会处理 20 */ 21 Union, 22 /** 23 * 商店处理 24 */ 25 Shop, 26 ; 27 28 }
队列任务
1 package com.ty.test.queue; 2 3 import java.io.Serializable; 4 5 /** 6 * @author: Troy.Chen(失足程序员, 15388152619) 7 * @create: 2021-04-06 11:35 8 **/ 9 public abstract class TyEvent implements Serializable { 10 11 private static final long serialVersionUID = 1L; 12 13 private QueueKey queueKey; 14 15 public abstract void run(); 16 17 public TyEvent(QueueKey queueKey) { 18 this.queueKey = queueKey; 19 } 20 21 public QueueKey getQueueKey() { 22 return queueKey; 23 } 24 }
最关键的代码来了,这个是主队列,
1 package com.ty.test.queue; 2 3 import java.io.Serializable; 4 import java.util.HashMap; 5 import java.util.concurrent.LinkedBlockingQueue; 6 import java.util.concurrent.TimeUnit; 7 import java.util.concurrent.atomic.AtomicInteger; 8 import java.util.concurrent.locks.ReentrantLock; 9 10 /** 11 * 队列 12 * 13 * @author: Troy.Chen(失足程序员, 15388152619) 14 * @create: 2021-04-06 11:14 15 **/ 16 public class TyQueue implements Serializable { 17 18 private static final long serialVersionUID = 1L; 19 /*同步锁保证队列数据的准确性*/ 20 protected ReentrantLock reentrantLock = new ReentrantLock(); 21 /*子队列容器*/ 22 protected HashMap<QueueKey, TySubQueue> subQueueMap = new HashMap<>(); 23 24 /*队列容器*/ 25 protected LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<>(); 26 /*队列里面包括子队列任务项*/ 27 protected final AtomicInteger queueSize = new AtomicInteger(); 28 29 /** 30 * 添加任务 31 * 32 * @param obj 33 */ 34 public void add(TyEvent obj) { 35 reentrantLock.lock(); 36 try { 37 if (obj.getQueueKey() == QueueKey.Default) { 38 /*默认模式直接加入队列*/ 39 queue.add(obj); 40 } else { 41 TySubQueue subQueue = subQueueMap.get(obj.getQueueKey()); 42 if (subQueue == null) { 43 subQueue = new TySubQueue(obj.getQueueKey()); 44 subQueueMap.put(obj.getQueueKey(), subQueue); 45 } 46 /*有排队情况的,需要加入到排队子队列*/ 47 subQueue.add(obj); 48 /*这里是关键,*/ 49 if (!subQueue.isAddQueue()) { 50 subQueue.setAddQueue(true); 51 /*如果当前子队列不在队列项里面,需要加入到队列项里面去*/ 52 queue.add(subQueue); 53 } 54 } 55 /*队列的数据加一*/ 56 queueSize.incrementAndGet(); 57 } finally { 58 reentrantLock.unlock(); 59 } 60 } 61 62 63 /** 64 * 获取任务 65 * 66 * @return 67 * @throws InterruptedException 68 */ 69 public TyEvent poll() throws InterruptedException { 70 Object poll = this.queue.poll(500, TimeUnit.MILLISECONDS); 71 if (poll instanceof TySubQueue) { 72 try { 73 reentrantLock.lock(); 74 TySubQueue subQueue = (TySubQueue) poll; 75 poll = subQueue.poll(); 76 } finally { 77 reentrantLock.unlock(); 78 } 79 } 80 if (poll != null) { 81 /*执行减一操作*/ 82 this.queueSize.decrementAndGet(); 83 return (TyEvent) poll; 84 } 85 return null; 86 } 87 88 /** 89 * 当任务执行完成后操作 90 * 91 * @param obj 92 */ 93 public void runEnd(TyEvent obj) { 94 reentrantLock.lock(); 95 try { 96 if (obj.getQueueKey() != QueueKey.Default) { 97 TySubQueue subQueue = subQueueMap.get(obj.getQueueKey()); 98 if (subQueue != null) { 99 if (subQueue.size() > 0) { 100 /*这个时候需要把队列重新添加到主队列*/ 101 queue.add(subQueue); 102 } else { 103 /*当子队列空的时候,标识队列已经不在主队列里面,等待下次加入新任务*/ 104 subQueue.setAddQueue(false); 105 } 106 } 107 } 108 } finally { 109 reentrantLock.unlock(); 110 } 111 } 112 }
子队列,也就是排队队列的关键所在
1 package com.ty.test.queue; 2 3 import java.io.Serializable; 4 import java.util.LinkedList; 5 6 /** 7 * 下级队列 8 * 9 * @author: Troy.Chen(失足程序员, 15388152619) 10 * @create: 2021-04-06 11:14 11 **/ 12 public class TySubQueue extends LinkedList<TyEvent> implements Serializable { 13 14 private static final long serialVersionUID = 1L; 15 16 private final QueueKey queueKey; 17 private boolean addQueue = false; 18 19 public TySubQueue(QueueKey queueKey) { 20 this.queueKey = queueKey; 21 } 22 23 public QueueKey getQueueKey() { 24 return queueKey; 25 } 26 27 public boolean isAddQueue() { 28 return addQueue; 29 } 30 31 public TySubQueue setAddQueue(boolean addQueue) { 32 this.addQueue = addQueue; 33 return this; 34 } 35 36 @Override 37 public String toString() { 38 return "{" + "queueKey=" + queueKey + ", size=" + size() + '}'; 39 } 40 }
测试一下结果
1 package com.ty.test.queue; 2 3 /** 4 * @author: Troy.Chen(失足程序员, 15388152619) 5 * @create: 2021-04-06 11:46 6 **/ 7 public class Test { 8 9 public static final TyQueue queue = new TyQueue(); 10 11 public static void main(String[] args) { 12 queue.add(new TyEvent(QueueKey.Default) { 13 @Override 14 public void run() { 15 System.out.println(Thread.currentThread().getId() + ", 1"); 16 } 17 }); 18 queue.add(new TyEvent(QueueKey.Default) { 19 @Override 20 public void run() { 21 System.out.println(Thread.currentThread().getId() + ", 2"); 22 } 23 }); 24 queue.add(new TyEvent(QueueKey.Default) { 25 @Override 26 public void run() { 27 System.out.println(Thread.currentThread().getId() + ", 3"); 28 } 29 }); 30 31 T t1 = new T(); 32 T t2 = new T(); 33 T t3 = new T(); 34 t1.start(); 35 t2.start(); 36 t3.start(); 37 } 38 39 public static class T extends Thread { 40 41 @Override 42 public void run() { 43 while (!Thread.currentThread().isInterrupted()) { 44 TyEvent poll = null; 45 try { 46 poll = queue.poll(); 47 if (poll != null) { 48 /*执行任务*/ 49 poll.run(); 50 } 51 } catch (InterruptedException interruptedException) { 52 Thread.currentThread().interrupt(); 53 } catch (Throwable throwable) { 54 throwable.printStackTrace(System.out); 55 } finally { 56 if (poll != null) { 57 /*当然任务执行完成后*/ 58 queue.runEnd(poll); 59 } 60 } 61 } 62 } 63 64 } 65 66 }
我们用三个线程测试一下,在没有排队情况下执行输出
我们可以看到三个任务分别有三个线程执行了;
接下来我们在队列里面再额外加入三个登录排队队列
1 queue.add(new TyEvent(QueueKey.Login) { 2 @Override 3 public void run() { 4 System.out.println(Thread.currentThread().getId() + ", Login 1"); 5 } 6 }); 7 queue.add(new TyEvent(QueueKey.Login) { 8 @Override 9 public void run() { 10 System.out.println(Thread.currentThread().getId() + ", Login 2"); 11 } 12 }); 13 queue.add(new TyEvent(QueueKey.Login) { 14 @Override 15 public void run() { 16 System.out.println(Thread.currentThread().getId() + ", Login 3"); 17 } 18 });
再看看输出情况,
很明显的可以看到我们加入到登录队列的任务,又同一个线程顺序执行的;
总结
排队队列就是为了让同一类型任务顺序执行或者叫多任务操作同一个对象的时候减少加锁 带来的额外开销,减少线程等待的时间;
更合理的利用的线程。避免涝的涝死,旱的旱死;
我要订一个小目标