高性能多线程工作项/事件调度引擎






4.94/5 (55投票s)
用于调度和执行工作项的高性能解决方案。
引言
本文是系列文章的第一篇,将涵盖创建和维护一个健壮、可扩展、高性能的大型多人在线游戏服务器和游戏引擎所需的组件的架构和实现。
作为一篇关于设计和实现大型多人在线游戏服务器和引擎的系列文章中的第一篇,本文将重点介绍一个必要组件的设计和实现,该组件负责将静态的虚拟比特转化为一个动态、鲜活、交互式的虚拟世界。
工作项和事件调度器
本文介绍的组件是调度引擎,也称为工作项和事件调度器。虽然它的名字不太吸引人,但该组件的功能是使大型多人在线游戏如此引人入胜的基础:不断演变的虚拟世界,以及动态的、实时的玩家与世界的交互。
顾名思义,工作项和事件调度器负责调度、协调并执行虚拟世界中发生的动作。然而,调度引擎处理的不仅仅是玩家发起的动作;它还会调度天气变化、驱动昼夜循环,并执行定义非玩家角色和对象行为的代码片段。(因此,我将把任何计划由事件调度器执行的代码称为工作项。)
虚拟世界基础,101
大型多人游戏由场景和设定(动态虚拟世界)以及可以与场景和设定互动并改变它们的角色组成。为了给玩家提供沉浸式和逼真的体验,与虚拟世界的互动(以及对虚拟世界的改变)需要以一种提供连续时间错觉的方式进行;换句话说,虚拟世界中发生的一切都必须融入某种可信的“虚拟实时”流程中。
成千上万的玩家可以同时与一个虚拟世界进行交互。除了玩家驱动的交互之外,还需要发生其他自动化的虚拟世界变化。这些自动化变化范围从非玩家角色的移动和行为到天气和时间的变化。
调度引擎
调度引擎在驱动实时变化和创造交互式、动态虚拟世界的体验中所扮演的角色如下图所示。
调度引擎组件本身仅包含上图中(以黄色突出显示的)调度引擎和已调度工作项队列框,并且只负责两件事:
- 调度工作项以便将来执行,以及
- 在计划时间执行工作项。
使“虚拟实时”变得可信
以下是为玩家提供可信的实时虚拟世界体验的基本要求:
- 奇幻类游戏通常会在玩家开始施放法术和法术效果发生之间引入一个计算好的延迟。或者,我们可能希望角色拔出武器所需的时间取决于角色技能。因此,我们需要能够调度一个事件在特定时间执行。由于我们可能处理的是很短的时间间隔(例如,玩家发出命令和相应操作执行之间的时间),因此操作/工作项的实际执行需要尽可能接近其计划时间。此外,我们需要对计划执行时间进行相当精细化的控制——玩家角色攻击另一角色所需的时间可能因攻击类型、使用的武器和角色的速度而异,并且可能还会受到环境因素、角色状态、角色效果等的影响。这些对攻击时间延迟的微小修改必须是可检测的且有意义的;两次攻击之间延迟 500 毫秒和 600 毫秒之间的差异应该是显而易见的,并且可能在战斗结果方面有意义。
- 取消已计划工作项执行的功能是引入操作启动和执行之间的延迟时出现的一个需求。例如,玩家可能开始对目标施放法术。在施放法术期间,可能会发生几个事件导致取消:角色可能改变了施放法术的想法,目标可能移出了范围,或者(就像在一些流行的在线游戏中)如果另一个角色成功攻击了施法者,施法可能会被取消。
- 为了维持虚拟世界中实时的稳定错觉,一个事件的执行不得延迟其他并发计划事件的执行。如果一个事件的执行延迟了其他计划事件的执行,虚拟实时的效果就会丢失,导致全局减速(“延迟”)。
- 随着玩家数量的增加和虚拟世界的规模扩大,维持“虚拟实时”运行所需的更新数量呈指数级增长。因此,许多现有的 MMO 游戏使用多个松散连接的游戏服务器。通过为每个服务器分配一个“部分”虚拟世界来维护,并限制单个服务器上的玩家数量,可以控制在实时运行虚拟世界所需的更新数量。然而,在使用多个服务器时,玩家间的合作和通信可能会变慢或受阻(例如,与在另一个服务器上玩游戏的团队成员交谈),并且可能需要一些技巧或笨拙的方法(例如,传送门)来在服务器之间转移玩家。由于服务器间通信的限制,统一的虚拟世界体验可能会受到负面影响。这就引出了我们最后一个需求:为了支持数千名并发玩家而不显著限制我们的世界规模(或要求使用多个服务器),调度引擎必须足够高效,能够处理每秒数十万个事件。
每 90-120 毫秒调度 20,000 个角色进行“行走”。“行走”包括检查角色腿部力量、更新角色的移动能量、改变角色在虚拟世界中的位置,以及将行走事件重新调度到 90-120 毫秒后再次发生。本文介绍的调度引擎能够每秒执行 172,730 个此类“行走”事件(而不会使我的笔记本电脑 CPU 达到满负荷)。
调度器究竟调度什么?
调度器调度和执行工作项。工作项最好通过三个功能性需求来定义:
- 工作项需要能够做某事。
- 如果工作项能够计算、设置(并对其有反向了解)自身的执行时间,那将很方便。
- 我们还需要选择取消尚未执行的工作项的选项。
IExecutableWorkItem
接口就是根据这些功能性需求设计的,它定义了可以在特定时间调度执行(或在执行前取消)的对象的最低级本质。
public interface IExecutableWorkItem
{
void Execute();
DateTime ExecutionTime { get; set; }
bool Cancelled { get; set; }
}
调度引擎需求
现在我们已经定义了工作项,就可以定义调度引擎本身的功能性需求了。首先,我们需要一种初始化调度引擎并指示其开始处理已调度工作项的方法(void StartSchedulingEngine()
)。反之,我们也需要一种指示调度引擎停止处理计划事件并关闭的方法。我们还需要知道调度引擎当前是否正在运行,或者它是否正在尝试关闭(属性:bool WantExit { get; set; }
)。由于调度引擎根据其对当前时间的认知来执行工作项,因此能够访问引擎对当前日期和时间的认知会很有帮助(属性:DateTime CurrentDateTime { get; set; }
)。出于信息目的,我们可能需要跟踪已执行的工作项数量以及当前排队(计划在未来某个时间执行)的工作项数量。还能够检查在工作项执行过程中可能抛出的任何异常将很有帮助,最后但同样重要的是,我们需要一种调度工作项/事件以供执行的方法(void ScheduleEventToExecute(IExecutableWorkItem eventToExecute)
)。下方展示了 `ISchedulingEngine` 接口:
public interface ISchedulingEngine
{
void StartSchedulingEngine();
void ScheduleEventToExecute(IExecutableWorkItem eventToExecute);
long WorkItemsExecuted { get; set; }
DateTime CurrentDateTime { get; set; }
bool WantExit { get; set; }
/// <summary>
/// Returns an array of ISupportsCount, which can be used
/// to ge the count of items in each work queue
/// </summary>
ISupportsCount[] WorkItemQueueCounts { get; }
/// <summary>
/// Stores a list of exceptions that have
/// occurred trying to execute work items
/// </summary>
List<Exception> Exceptions { get; }
}
调度引擎:多线程和同步
调度引擎的实现还需要支持功能性需求 #3(一个工作项的执行不得延迟其他工作项的执行)和 #4(调度引擎必须能够每秒执行数十万个工作项)。
为了防止长时间运行的工作项延迟其他已调度工作项的执行,调度引擎将使用多个线程,每个线程都能够检索和执行任何当前或已到期的工作项。因为
- 我们希望能够从任何运行的线程调度新的工作项,并且
- 多个工作线程同时执行已调度的工作项,高效地同步对共享工作项队列的访问至关重要。
由于我们将使用大量工作线程,因此我们的同步方法必须健壮。因为我们还关注保持高吞吐量和工作项吞吐量,所以我们的同步方法在工作线程数量增加时不能成为性能瓶颈:每个运行的线程都需要一种快速而简化的方式来调度新工作项并访问当前已调度的工作项。
如果没有同步访问工作项队列,可能会出现几种灾难性场景:多个线程同时尝试调度工作项,以及 `WorkItemQueueHandler` 线程试图出队一个正在被另一个线程调度的相同工作项。
简单的问题,简单的解决方案
我们的同步需求很简单:防止多个线程同时执行入队或出队调度队列工作项的代码关键部分。我们的同步解决方案也可以同样简单:创建一个一次只能由一个线程拥有的锁。获取锁后,单个线程可以开始执行代码关键部分并修改共享的工作项队列。当代码关键部分执行完毕后,拥有锁的线程会释放锁,供另一个幸运的线程获取。反过来,另一个线程获取锁并可以开始执行代码关键部分。下面的 `Enqueue` 和 `Dequeue` 方法演示了使用锁来同步对修改共享项队列的小关键代码段的访问:
public void Enqueue(TListType item)
{
// We need to be sure that no other threads
// simultaneously modify the shared _queue
// object during our enqueue operation
AquireLock();
{
_queue.Enqueue(item);
_count++;
}
ReleaseLock();
}
public bool Dequeue(out TListType item)
{
item = null;
// We need to be sure that no other threads
// simultaneously modify the shared _queue
// object during our dequeue operation
AquireLock();
{
if (_count > 0)
{
item = _queue.Dequeue();
_count--;
}
}
ReleaseLock();
return (item != null);
}
高性能锁定
.NET 中有许多可用的同步解决方案(`Monitor.Enter`、`ReaderWriterLock`、`lock()` 等),用于调节对共享资源的访问并防止关键代码块的并发执行。然而,正如您可能在上面的 `Enqueue` 和 `Dequeue` 方法中注意到的那样,我开辟了自己的道路,并创建了一个 `Lockable` 类来强制对关键代码段执行单线程访问。`Lockable` 类提供了比 `lock()` 等同步机制在负载条件下更快的替代方案(参见下文的性能指标)。`Lockable` 类使用原子 `Interlocked.CompareExchange` 操作结合自旋/睡眠/重试机制,以确保在任何给定时间只有一个线程拥有锁。
设计此锁定机制的优先级和考虑因素包括:
- 尽量减少对 `Interlocked.CompareExchange` 的调用次数,因为它是一个相当昂贵的操作。
- 需要单线程访问的代码段在拥有锁时,不应执行大量工作。因此,假定锁通常是空闲的,并且可以成功获取。当此假设成立时,代码应遵循其最佳执行路径(即,当锁空闲且可以立即获取时,应执行最少的工作)。
- 程序员有责任了解如何使用此锁并熟悉其许多缺点(请参阅下文的缺点)。
下方定义了 `AquireLock()` 方法。
public void AquireLock()
{
// Assume that we will grab the lock - call CompareExchange
if (Interlocked.CompareExchange(ref _lock, 1, 0) == 1)
{
int n = 0;
// Could not grab the lock - spin/wait
// until the lock looks obtainable
while (_lock == 1)
{
if (n++ > SpinCycles)
{
Interlocked.Increment(ref _conflicts);
n = 0;
Thread.Sleep(0);
}
}
// Try to grab the lock - call CompareExchange
while (Interlocked.CompareExchange(ref _lock, 1, 0) == 1)
{
n = 0;
// Someone else grabbed the lock. Continue to spin/wait
// until the lock looks obtainable
while (_lock == 1)
{
if (n++ > SpinCycles)
{
Interlocked.Increment(ref _conflicts);
n = 0;
Thread.Sleep(0);
}
}
}
}
}
许多其他锁定机制中发现的安全功能被故意省略以提高性能。由于缺乏安全功能,此锁定机制存在一些功能性缺陷:
- 锁不能被获取两次,即使是同一个已经拥有锁的线程再次请求锁。
- 没有 `try/catch/finally` 逻辑来确保调用 `ReleaseLock();`。这意味着在 `AquireLock();` 和 `ReleaseLock();` 语句之间引发的任何异常都会使锁变得无法获取(除非采取措施手动释放锁)。
- 该锁未针对高效获取多个(嵌套)锁进行优化。不一致的嵌套锁定可能导致死锁和其他严重问题。
性能结果 - Lockable vs. lock()
解决方案中包含了我编写的一个简单程序,用于比较 `Lockable` 类与 .NET 标准 `lock()` 方法在类似调度引擎将要遇到的条件下的相对性能。这些条件是:
- 多个工作线程将持续尝试检索和执行工作项,并且
- 工作项将以适中但稳定的节奏调度。
为了在这些条件下测试相对性能,使用了多个工作线程来执行对共享(静态)队列的锁定入队和出队操作。具体来说,一半的工作线程负责在短暂的活动爆发中将项目入队到共享(静态)队列,而其余的线程则持续从同一个共享队列中出队项目。用于获取以下性能指标的源代码包含在解决方案源代码中。
在 `Lockable` 和 `lock()` 测试中,一半的工作线程执行 `Enqueue` 方法(如下),而另一半工作线程执行 `Dequeue` 方法。使用 `lock()` 的测试代码与使用 `Lockable` 的测试代码的区别仅在于使用的同步方法。
static class Lockable = new Lockable();
static object _lockObject = new object();
public static void Enqueue()
{
for (int n = 0; n < 1000; n++)
{
for (int i = 0; i < 20; i++)
DoEnqueue(i + n);
Thread.Sleep(0);
}
}
public static void Dequeue()
{
for (int n = 0; n < 1000; n++)
{
while (_testQueue1.Count > 0)
DoDequeue();
}
}
对于 `Lockable` 测试,使用了以下 `DoEnqueue` 和 `DoDequeue` 方法:
private static void DoEnqueue(int n)
{
_lock.AquireLock();
{
_testQueue1.Enqueue(n);
}
_lock.ReleaseLock();
}
private static void DoDequeue()
{
object o;
_lock.AquireLock();
{
if (_testQueue1.Count > 0) o = _testQueue1.Dequeue();
}
_lock.ReleaseLock();
}
对于 `lock()` 测试,使用了以下 `DoEnqueue` 和 `DoDequeue` 方法:
private static void DoEnqueue(int n)
{
lock (_lockObject)
{
_testQueue1.Enqueue(n);
}
}
private static void DoDequeue()
{
object o;
lock (_lockObject)
{
if (_testQueue1.Count > 0) o = _testQueue1.Dequeue();
}
}
下方显示了使用 `Lockable` 与 `lock()` 执行类似工作量时的执行时间比较结果:
X 轴显示了使用的总工作线程数。所有工作线程完成一定工作量(在共享队列上)的总时间显示在 Y 轴上(以毫秒为单位)。`Lockable` 类在 20 到 80 个工作线程的不同数量下,性能优于 `lock()`。使用更轻量级的 `Lockable` 类的性能优势随着工作线程数量(以及因此对共享队列的访问争用)的增加而增加。
回到正题:实现调度引擎
既然我们有了管理工作项队列多线程访问的有效方法,那么就该实现调度引擎了。下面是对调度引擎实现的简要描述:
- 计划执行的工作项可以存储在多个队列中的一个中。
- 每个工作项队列都由 `WorkItemQueueHandler` 的一个实例服务,并且每个 `WorkItemQueueHandler` 实例都使用一个单独的、专用的线程来处理其计划的工作项队列。
- 大多数队列(“快速通道”队列)专门用于处理需要在未来 500 毫秒内执行的工作项。
- 另外两个工作项队列(“慢速通道”队列)跟踪计划稍后执行的工作项(即,未计划在未来 500 毫秒内执行)。
- 当调度一个工作项时,它会根据其计划执行时间和轮循分配被分配到一个快速通道或慢速通道队列。
- 快速通道 `WorkItemQueueHandler` 工作线程的职责是执行其关联工作项队列中的所有当前或已到期工作项。
- 慢速通道 `WorkItemQueueHandler` 工作线程的职责是确保其慢速通道工作项队列中接近执行时间的工作项被移动到合适的快速通道队列。
下图说明了工作项、工作项队列处理程序和调度引擎之间的关系。
每个快速通道工作项队列处理程序使用以下代码来高效处理其关联工作项队列中的工作项:
private int ExecuteEventsInQueue(int queueNumber)
{
IExecutableWorkItem item;
DateTime currentTime = CurrentDateTime;
// Stores work items whose execution time has not
// yet come - and need to be placed back in the queue
List<IExecutableWorkItem> reschedule = new List<IExecutableWorkItem>();
// Stores Dequeue'd work items - Dequeues multiple items
// to reduce repeated calls to Dequeue (and to reduce locking)
List<IExecutableWorkItem> itemsToExecute = new List<IExecutableWorkItem>(16);
// Keep track of work item executions this pass
int executedThisPass = 0;
// Dequeue multiple work items from the queue
while (_workItemsToExecute[queueNumber].DequeueMultiple(itemsToExecute, 10) > 0)
{
// Check each dequeue'd work item
for (int n = 0; n < itemsToExecute.Count && !WantExit; n++)
{
item = itemsToExecute[n];
if (item.ExecutionTime > currentTime)
{
// Execution time for the work item is still in the future
reschedule.Add(item);
}
else
{
// It is time to execute this work item. Do it.
executedThisPass++;
item.Execute();
}
}
itemsToExecute.Clear();
}
// Re-queue all work items that were dequeue'd but not executed (not yet their time)
_workItemsToExecute[queueNumber].EnqueueMultiple(reschedule);
// Add to the executed work item total
if (executedThisPass > 0)
{ Interlocked.Add(ref _executed, Convert.ToInt64(executedThisPass)); }
return executedThisPass;
}
慢速通道工作项队列处理程序使用以下代码将可能需要尽快执行的工作项分配给快速通道队列:
private int UpdateWorkItemSchedule(int queueNumber)
{
int workItemsMoved = 0;
IExecutableWorkItem item;
// Store items that need to go back into this 'slow-track' queue
List<IExecutableWorkItem> itemsBackIntoOriginalQueue = new List<IExecutableWorkItem>();
List<IExecutableWorkItem> workItems = new List<IExecutableWorkItem>(16);
while (_allWorkItemQueues[queueNumber].DequeueMultiple(workItems, 10) > 0 && !WantExit)
{
for (int n = 0; n < workItems.Count; n++)
{
item = workItems[n];
// Determine the appropriate work item queue for
// this item, based on its scheduled execution time
int appropriateQueue = FindAppropriateQueue(item);
// Check if this item needs to be moved into a different queue
if (queueNumber != appropriateQueue)
{
_allWorkItemQueues[appropriateQueue].Enqueue(item);
workItemsMoved++;
}
else
{
// We will need to put the item back into the original queue
itemsBackIntoOriginalQueue.Add(item);
}
}
workItems.Clear();
}
// Return the work items that did not need to be moved to the slow-track queue
_allWorkItemQueues[queueNumber].EnqueueMultiple(itemsBackIntoOriginalQueue);
return workItemsMoved;
}
使用调度引擎
如果您还在阅读,那么您会很高兴知道设置和使用调度引擎比您刚才阅读本文所做的工作要容易得多。要使用调度引擎,我们需要创建一个测试工作项类。对于我们的测试工作项类,我们将坚持最简单的实现,该实现完全实现了 `IExecutableWorkItem`。我们甚至会负责将工作项的计划执行时间传递给测试工作项类的创建者。
public class TestWorkitem : IExecutableWorkItem
{
public static int _executed = 0;
public TestWorkitem(DateTime timeToExecute)
{ ExecutionTime = timeToExecute; }
public bool Cancelled { get; set; }
public virtual DateTime ExecutionTime { get; set; }
public virtual void Execute() { Interlocked.Increment(ref _executed);
Console.WriteLine("Work item executed"); }
}
现在,我们可以设置调度引擎了。让我们设置一个具有 32 个工作线程的调度引擎并开始运行它:
SchedulingEngine engine = new SchedulingEngine(32);
engine.StartSchedulingEngine();
现在调度引擎已经启动并运行,我们可以调度一些测试工作项,并要求调度引擎在完成后通知我们。
for (int n = 0; n < 50000; n++)
{
TestWorkitem workItem =
new TestWorkitem(engine.CurrentDateTime.AddMilliseconds(n / 10));
engine.ScheduleEventToExecute(workItem);
}
engine.WorkItemQueuesEmpty += new EventHandler(engine_WorkItemQueuesEmpty);
由于您的工作项将在计划时间使用调度引擎提供的线程执行,因此您可以在当前线程上继续进行自己的工作;工作项将在计划时间执行。如上所示,您可以订阅 `WorkItemQueuesEmpty` 事件,该事件由 `SchedulingEngine` 类实例在所有工作项执行完毕且工作项队列变空时引发。
要优雅地关闭调度引擎,请设置 engine.WantExit = true;
。
源代码和示例项目
希望您喜欢阅读本文,并/或学到了一些有用的编程技巧。欢迎提出建议,并且在附带的源代码中发现任何有用的代码片段最适合在您自己的个人项目中使用。附带的解决方案和源代码包含以下项目:
- Interfaces 和 GameIndependentInterfaces 项目仅包含接口定义。这些接口被整个游戏服务器的相当一部分使用,并且不局限于本文的范围。
- ThreadSafeObjects 项目包含 `Lockable` 和通用线程安全队列的实现。在接下来的文章中,随着许多更多线程安全类实现的加入,该项目将显著增长。
- Extensions 项目包含一些方便的扩展方法。随着本系列文章的进展,该项目将变得越来越大。
- SchedulingEngine 项目实现了调度引擎本身。
- TestSchedulingEngine 项目包含一个单元测试,用于确认调度引擎在适当的时间内执行所有计划的工作项。
- TestLockableSpeedConsole 项目包含一个控制台应用程序,用于测量 `Lockable` 与 `lock()` 的相对性能。
- 最后,一个恰如其名(?)的项目,Article1,包含一个轻量级的 Windows 应用程序,允许您:
- 调整调度引擎线程参数,并且
- 提交并监控成千上万(或数十万)个随机调度的测试工作项的执行情况。
关注点
我在 20 世纪 90 年代初开始使用 C 语言编写 telnet/基于文本的多人在线游戏,远在图形化 MMPORGs 出现之前。我将在 CodeProject 上展示的游戏引擎与我 15 年前开发的 MUD 代码库共享同一个名称 - EmlenMud。原始代码库在 Cassiopedia.org 上有提及。
即将发布的文章主题列在下方:
- 高性能 TCP/IP 通信库的设计与实现
- 实体、能力、意图、动作和状态
- 虚拟世界:交互、数据结构和同步
- 控制命令和变更通知
- 数据存储和虚拟世界持久化
- 插件和可扩展性
欢迎所有评论/建议。