哲学家就餐问题
使用信号量(SemaphoreSlim)解决哲学家就餐问题。
引言
哲学家就餐问题是一个老问题,用维基百科的话来说:“在计算机科学中,哲学家就餐问题是一个经典的例子问题,常用于并发算法设计中,以说明同步问题和解决这些问题的技术。
“它最初由Edsger Dijkstra于1965年提出,作为学生考试练习,其形式是计算机争夺对磁带驱动器外设的访问权。此后不久,Tony Hoare给出了该问题的当前表述。
“五位沉默的哲学家围坐在一张圆桌旁,面前放着几碗意大利面。每对相邻的哲学家之间都放着一把叉子。
“每位哲学家必须轮流思考和进食。然而,哲学家只有在拥有左右两把叉子时才能吃意大利面。每把叉子只能由一位哲学家持有,因此哲学家只有在叉子未被其他哲学家使用时才能使用它。一位哲学家吃完饭后,需要放下两把叉子,以便其他哲学家可以使用这些叉子。哲学家可以在叉子可用时拿起右边的叉子或左边的叉子,但在拿到两把叉子之前不能开始进食。
“进食不受剩余意大利面数量或胃部空间的限制;假定有无限的供应和无限的需求。
“问题是如何设计一种行为规范(一种并发算法),使任何哲学家都不会挨饿;也就是说,假定没有任何哲学家能知道其他人何时可能想吃或想思考,每个人都可以永远地在进食和思考之间交替。”
背景
直接的解决方案包括
* if left fork is available
then
pick it up
else
return to thinking mode.
* if right fork is available
then
pick right fork up
initiate eating for some duration of time
then put both forks down
and returning to thinking mode
else
put left fork down
and return to thinking mode.
这种算法理论上存在一种可能性,即所有五 (5) 位哲学家都会拿起各自的左叉,然后无法拿起右叉,于是将左叉放回原位并返回思考,然后永远重复这些步骤。这不是一个实际问题,因为我们可以随机化进食和思考时间。
然而,解决哲学家就餐问题还有另一条途径,那就是允许一个外部的第三方参与者,在我们的例子中是操作系统,通过相当于两 (2) 张“允许进食通行证”来选择下一位进食的哲学家,当吃完饭后,哲学家会放弃这张“允许进食通行证”。这个第三方外部参与者是“允许进食通行证”的保管者。
这种“允许进食通行证”可以用 `Mutex` 模拟,但由于我们有两 (2) 张,`Semaphore` 是更合适的选择。在我们的例子中,我们将使用 .NET 的 `SemaphoreSlim`。
Using the Code
我们需要定义一个叉子,因为每位哲学家都需要左右两把叉子才能进食。
public class Fork
{
public Fork(int name)
{
Name = name;
IsInUse = false;
WhichPhilosopher = null;
}
/// <summary>Name of fork</summary>
public int Name { get; }
/// <summary>Is fork in use</summary>
public bool IsInUse { get; set; }
/// <summary>If the fork is in use then keep the philosopher using it.</summary>
public Philosopher WhichPhilosopher { get; set; }
}
`Philosopher` 类有点复杂,它记录了哲学家的饮食习惯。`EatingHabit(..)` 方法是每位哲学家用来吃意大利面的方法。这个 `EatingHabit(..)` 方法是为所有五 (5) 位哲学家并发运行的方法。
`EatingHabit(..)` 方法本质上以等待“允许进食通行证”(通过 `SemaphoreSlim.Wait()` 模拟)开始,然后检查叉子可用性。外部实体(在我们的例子中是操作系统)最多允许 2 位非进食哲学家进食。然而,如果相邻的哲学家正在进食,那么我们这位哲学家至少有一把或两把叉子不可用,因此哲学家需要将“允许进食通行证”(`SemaphoreSlim.Release()`)返还给“允许进食通行证”的保管者。另一方面,如果两把叉子都可用,那么哲学家就可以开始吃意大利面,然后进食一段时间,然后释放叉子并返还“允许进食通行证”。
一个关注点:如果我们在 `EatingHabit(..)` 方法中使用 `SemaphoreSlim.Wait()`,那么执行线程将阻塞,直到操作系统允许哲学家进食。在等待期间,执行线程无法执行其他工作。使用 `SemaphoreSlim.WaitAsync()`,例程将不会等待并返回一个 Task。因此,可以调用 `SemaphoreSlim.WaitAsync().ContinueWith(..)`,以这种方式使用它,我们可以在等待时允许其他任务得到服务。然而,这样做意味着最终 `Main` 例程在一位或两位哲学家吃完最后一餐之前就已完成。因此,我们将使用阻塞调用 `SemaphoreSlim.Wait()` 或 `SemaphoreSlim.WaitAsync().Wait()`。
`Philosopher` 类如下所示
public class Philosopher
{
public Philosopher(int name, Fork leftFork, Fork rightFork, Philosophers allPhilosophers)
{
Name = name;
LeftFork = leftFork;
RightFork = rightFork;
_allPhilosophers = allPhilosophers;
_rnd = new Random(Name); // Randomize eating time differently or each philosopher
}
/// <summary>Two (2) philosophers may eat at the same time</summary>
static readonly SemaphoreSlim AquireEatPermissionSlip = new SemaphoreSlim(2);
/// <summary>Synchronization object for acquiring forks and change status.</summary>
private static readonly object Sync = new object();
/// <summary>
/// Name of philosopher
/// The philosophers are OK with a numeric name. Philosophers are good that way.
/// </summary>
public int Name { get; }
/// <summary>Track philosopher left fork</summary>
private Fork LeftFork { get; }
/// <summary>Track philosopher right fork</summary>
private Fork RightFork { get; }
private PhilosopherStatus Status { get; set; } = PhilosopherStatus.Thinking;
/// <summary>A philosopher may inquire about other dining philosophers</summary>
private readonly Philosophers _allPhilosophers;
/// <summary>Regulates the duration of eating</summary>
private readonly Random _rnd;
private readonly int _maxThinkDuration = 1000;
private readonly int _minThinkDuration = 50;
/// <summary>
/// The routine each Task employs for the eating philosophers
/// </summary>
/// <param name="stopDining"></param>
public void EatingHabit(CancellationToken stopDining)
{
// After eat permission was granted. The philosopher will wait for a
// duration of durationBeforeRequstEatPermission before the routine
// will repeat by waiting for an eat permission again.
var durationBeforeRequstEatPermission = 20;
for (var i = 0;; ++i)
{
// If calling routine is asking for dining to stop then comply and stop.
if (stopDining.IsCancellationRequested)
{
Console.WriteLine($"Ph{Name} IS ASKED TO STOP THE DINING EXPERIENCE");
stopDining.ThrowIfCancellationRequested();
}
try
{
// Wait for eating permission.
AquireEatPermissionSlip.WaitAsync().Wait();
Console.WriteLine($"Philosopher Ph{Name} will attempt to eat. Attempt: {i}.");
bool isOkToEat;
lock (Sync)
{
// Check for Fork availability
isOkToEat = IsForksAvailable();
if (isOkToEat)
AquireForks();
}
if (isOkToEat)
{
PhilosopherEat();
ReleaseForks();
}
}
finally
{
AquireEatPermissionSlip.Release();
}
// Wait for a duration of durationBeforeRequstEatPermission
// before waiting for eat permission.
Task.Delay(durationBeforeRequstEatPermission).Wait();
}
}
private bool IsForksAvailable()
{
// Note that this Sync is superfluous because to be effective
// the entire method should be called under a lock (sync).
// Otherwise, after the lock when the method checks for fork
// availability and before the return, one or both forks may
// be snatched by another philosopher.
lock (Sync)
{
if (LeftFork.IsInUse)
{
Console.WriteLine($"Philosopher Ph{Name} cannot eat. Left fork is in use");
return false;
}
if (RightFork.IsInUse)
{
Console.WriteLine($"Philosopher Ph{Name} cannot eat. Right fork is in use");
return false;
}
}
// Both forks are available, philosopher may commence eating
return true;
}
private void PhilosopherEat()
{
// Eating duration is randomized
var eatingDuration = _rnd.Next(_maxThinkDuration) + _minThinkDuration;
// Display who is eating
Console.WriteLine($"Philosopher Ph{Name} is eating.");
//
// Simulate our philosopher eating yummy spaghetti
//
Thread.Sleep(eatingDuration);
Console.WriteLine($"Philosopher Ph{Name} is thinking after eating yummy spaghetti.");
}
private void AquireForks()
{
lock (Sync)
{
LeftFork.IsInUse = true;
LeftFork.WhichPhilosopher = this;
RightFork.IsInUse = true;
RightFork.WhichPhilosopher = this;
Status = PhilosopherStatus.Eating;
Console.WriteLine($"Philosopher Ph{Name} acquired forks: " +
$"(F{LeftFork.Name}, F{RightFork.Name}).");
}
}
void ReleaseForks()
{
lock (Sync)
{
LeftFork.IsInUse = false;
LeftFork.WhichPhilosopher = null;
RightFork.IsInUse = false;
RightFork.WhichPhilosopher = null;
Status = PhilosopherStatus.Thinking;
Console.WriteLine($"Philosopher Ph{Name} released forks: " +
$"(F{LeftFork.Name}, F{RightFork.Name}).");
}
}
}
`Main()` 例程使用 `CancellationTokenSource` 构造来要求哲学家(所有哲学家)停止进食。由于所有哲学家都使用相同的 `CancellationTokenSource.Token`,因此一次取消将取消所有哲学家。`Main()` 例程如下所示(要求哲学家停止就餐的部分已加粗)
var philosophers = new Philosophers().InitializePhilosophers();
var phEatTasks = new List<Task>();
using (var stopDiningTokenSource = new CancellationTokenSource())
{
var stopDiningToken = stopDiningTokenSource.Token;
foreach (var ph in philosophers)
phEatTasks.Add(
Task.Factory.StartNew(() => ph.EatingHabit(stopDiningToken), stopDiningToken)
.ContinueWith(_ => {
Console.WriteLine($"ERROR... Ph{ph.Name} FALTED ...");
}, TaskContinuationOptions.OnlyOnFaulted)
.ContinueWith(_ => {
Console.WriteLine($" Ph{ph.Name} HAS LEFT THE TABLE ...");
}, TaskContinuationOptions.OnlyOnCanceled)
);
// Allow philosophers to dine for DurationAllowPhilosophersToEat
// milliseconds, 20000. Original problem have philosophers dine
// forever, but we are not patient enough to wait until
// forever...
Task.Delay(20000).Wait();
// After a duration of DurationAllowPhilosophersToEat we
// ask the philosophers to stop dining.
stopDiningTokenSource.Cancel();
Task.WaitAll(phEatTasks.ToArray());
}
// Done--so say so
Console.WriteLine("Done.");
结果
为简化起见,代码未显示统计数据累积。附件代码是完整的代码,并显示了统计信息的累积。以下是从哲学家就餐模拟运行中提取的(测试在 4 核机器上完成)
Philosopher Ph0 ate 15 times. For a total of 9,822 milliseconds. Eating conflicts: 9.
Philosopher Ph1 ate 14 times. For a total of 7,010 milliseconds. Eating conflicts: 21.
Philosopher Ph2 ate 17 times. For a total of 7,746 milliseconds. Eating conflicts: 15.
Philosopher Ph3 ate 15 times. For a total of 7,584 milliseconds. Eating conflicts: 13.
Philosopher Ph4 ate 11 times. For a total of 5,998 milliseconds. Eating conflicts: 22.
Collectively philosophers ate 72 times. For a total of 38,160 milliseconds. Eating conflicts: 80
我们应该注意到,没有哲学家挨饿,也没有哲学家比其他哲学家吃得更多。
`Main()` 例程要求哲学家停止进食并离开餐桌前,总运行时间为 20 秒。
就餐体验的总计时为 20 秒。因此,由于有两 (2) 位正在就餐的哲学家,理论上的总就餐时间是 40 秒。这个 40 秒不是一个确切的数字,因为取消令牌的细微差别。总就餐时间是 43 秒 150 毫秒。(我将在尾注中解释这额外的 3 秒 150 毫秒)。
为简单起见,我们将使用 40 秒作为总就餐时间的理论上限。上述统计信息显示总就餐时间为 38+ 秒,冲突次数为 80 次。这意味着有 2- 秒用于冲突解决和任务切换。在我们的案例中,冲突意味着一个无法进食的哲学家获得了进食许可,因此该哲学家与另一位哲学家发生了冲突。多次运行该程序,我得到的总就餐时间最低为 35+ 秒,最高为 39+ 秒。
关注点
随附的完整软件使用配置文件来存储常量,并允许用户根据需要进行更改。
为此,我们将在一个 `ConfigValue` 单例类中集中访问配置文件。我们需要做的一件事是将叉子的数量设置为与哲学家的数量相等。为此,我创建了一个构造,`App.Config` 文件如下所示:`{%..%}`。
<appSettings>
<add key="Philosopher Count" value="5" />
<add key="Fork Count" value="{%Philosopher Count%}" />
<!-- Other values -->
</appSettings>
为了处理特殊构造:`{%..%}`,我们在 `ConfigValue` 类中使用正则表达式,如下所示:
private readonly Regex _reValue = new Regex(@"\{\%(?<val>.*?)\%\}", RegexOptions.Singleline);
此正则表达式捕获 `{%` 和 `%}` 构造之间的一切,并将其作为 `val` 组名。访问此组的方式如下:
// Evaluate (match) the regular expression with the text
var m = _reValue.Match(text);
// Access the "val" group
var key = m.Groups["val"].Value;
这个正则表达式构造并不完美,它无法正确评估嵌套的 `{%..%}` 构造。但我认为我们可以接受这个限制。
题外话:为了允许嵌套的 `{%..%}` 构造,我们必须
- 将包含分隔符 `{%` 和 `%}` 的文本替换为文本中未使用的单字符值(例如:`\u0001` 和 `\u0002`)
- 新的正则表达式模式如下:`new Regex(@"\{\%(?<val>[^\u0001\u0002]*)\%\}", ...);`。
- 然后将单字符(我们示例中的 `\u0001` 和 `\u0002`)替换回 `{%` 和 `%}` 构造。
然而,我觉得这既不必要又过于复杂。
有兴趣的读者,可以在代码中看到该解决方案。请参阅附件中的 `Nested_config_value.linq`。要运行它,您需要 LinqPad(一个可免费下载的 C# 解释器)。或者,将文件 `try.dot.net` 中的代码粘贴到 https://try.dot.net 中的 C# fiddler 的顶部区域。
言归正传:回到 `app.config` 文件中评估 `appSettings`。我们使用两种方法,每种方法访问 `ConfigValue` 类中的 `appSettings` 值时都会调用 `GetConfigValue(string key)` 方法。
private string GetConfigValue(string key)
{
var rawValue = _appSettings[key];
if (string.IsNullOrEmpty(rawValue)) return string.Empty;
for (; ; )
{
var m = _reValue.Match(rawValue);
if (!m.Success) return rawValue;
rawValue = _reValue.Replace(rawValue, ReplaceKey);
}
}
private string ReplaceKey(Match m)
{
var key = m.Groups["val"].Value;
if (string.IsNullOrEmpty(key)) return string.Empty;
var rawValue = _appSettings[key];
return rawValue ?? string.Empty;
}
请注意,原始值捕获在 `rawValue` 变量中,该变量来自 `_appSettings` 构造,而不是 `ConfigurationManager.AppSettings[key]` 构造。`_appSettings` 在配置文件中定义。
private ConfigValue()
{
_appSettings = ConfigurationManager.AppSettings.AllKeys
.ToDictionary(x => x, x => ConfigurationManager.AppSettings[x]);
}
之所以做这种“体操”,是因为当我们使用 `ConfigurationManager.AppSettings[key]` 构造时,测试 `GetConfigValue(string key)` 和 `ReplaceKey(Match m)` 方法将非常困难。
尾注
超出 40 秒上限的最多 3 秒 150 毫秒的解释。
40 秒的进食时间很容易解释:程序运行了 20 秒,并且由于最多有两 (2) 位哲学家同时进食,因此我们得到总进食时间上限为 40 秒。
为了解释加班时间,我们来看一下 `EatingHabit(..)` 方法的伪代码,该方法为每个进食的哲学家同时运行。
/*01*/ EatingHabit()
/*02*/ loop forever
/*03*/ if (CancellationToken is signalled_as_cancelled) then return
/*04*/
/*05*/ AquireEatingPermissionSlip()
/*06*/
/*07*/ if (Forks are available)
/*08*/ AquireForks()
/*09*/ PhilosopherEat( duration of eat is random up to 1 sec and 50 millisec )
/*10*/ ReleaseForks()
/*11*/
/*12*/ ReleaseEatingPermissionSlip()
/*13*/
/*14*/ WaitForShortDuration( wait for 20 millisec );
现在假设有两 (2) 位哲学家正在进食,此时 `Main()` 例程发出 `CancellationToken` 取消信号。(当取消令牌发出信号时,从一 (1) 位哲学家正在进食的场景开始,情况也没有不同。)
这两个例程中的每一个都在处理第 8、9 或 10 行。因此,在这一点上,两 (2) 位正在进食的哲学家将在第 9 行进食期间停留最多 1 秒 50 毫秒,然后才在第 12 行释放许可凭证。(未计入的时间:我们当前的哲学家将在 20 毫秒后循环回到第 3 行并停止就餐体验。)
此时,我们最多有 3 位哲学家(最坏情况是 3 位哲学家)正在等待(在第 5 行)获得进食许可。因此,其中两 (2) 位将获得进食许可,我们将额外等待最多 1 秒 50 毫秒。所以现在我们已经超出了理论最大值最多 2 秒 100 毫秒。
现在我们还有最多 1 位哲学家等待进食许可。因此,再增加 1 秒 50 毫秒,总共额外增加 3 秒 150 毫秒。使得进食时间的上限为 43 秒 150 毫秒。
结论
我们已经实现了哲学家就餐问题的简单高效解决方案的目标。
还有一个问题需要解决:如果我们取消 `SemaphoreSlim` 的“允许进食许可”,并允许哲学家不断尝试进食,而现在叉子是唯一允许哲学家进食的标准,会发生什么?要进行这样的试验,我们只需注释掉对 `AquireEatPermissionSlip.WaitAsync().Wait()` 和 `AquireEatPermissionSlip.Release()` 的调用。令人惊讶的是,结果并没有差多少:
Philosopher Ph0 ate 13 times. For a total of 8,271 milliseconds. Eating conflicts: 319.
Philosopher Ph1 ate 14 times. For a total of 7,010 milliseconds. Eating conflicts: 326.
Philosopher Ph2 ate 18 times. For a total of 8,031 milliseconds. Eating conflicts: 329.
Philosopher Ph3 ate 11 times. For a total of 5,768 milliseconds. Eating conflicts: 382.
Philosopher Ph4 ate 15 times. For a total of 8,502 milliseconds. Eating conflicts: 304.
Collectively philosophers ate 71 times. For a total of 37,582 milliseconds. Eating conflicts: 1660
现在我们有高达 1,660 次冲突,总进食时间为 37+ 秒。这意味着大部分损失的时间(从 40 秒的理论上限中损失的时间)不是由冲突解决造成的,而是由任务切换造成的。为了进一步验证这一发现,我在一台8 核机器上运行了该程序,5 位哲学家的总进食时间为 42,267 毫秒。
附件
Zip 文件 | 描述 |
DiningPhilosophers1.zip | 我们一直在讨论的代码,一个 Visual Studio 2017 解决方案。 |
Nested_config_value.zip | 如果感兴趣:在配置处理中处理嵌套“`{%..%}`”构造的能力。我包含了一个 `.linq` 文件,需要 LinqPad 才能运行,或者您可以在 https://try.dot.net 中的 fiddler 中运行纯文本。 |
历史
- 2018/4/19:首次发布