Akka.NET 详解






4.89/5 (73投票s)
简要介绍 .NET Akka 框架 (Akka.NET) 的使用
引言
前段时间我对消息传递非常感兴趣,我一直使用 Signalr、WCF、MSMQ 和 NServiceBus 等技术,这些技术都非常酷。但我确实觉得我想学得更多,所以花了好几个月时间尝试了一些东西,比如
- EasyMQ(基于 RabbitMQ 的服务总线风格 API)
- Azure ServiceBus
- NetMQ
在我对消息传递的这个阶段,我偶然发现了一些真正引起我兴趣的东西,那就是使用“Actor(s)”进行分布式编程。Actor 出现在各种框架中,例如
- Microsoft Orleans:分布式虚拟 Actor 模型
- Microsoft Axum:一个研究项目,从未投入生产,我认为这很遗憾
- Akka:基于 Java / Scala 的 Actor 模型
- Akka.NET:Akka 的 .NET 移植
本文将介绍 Akka.NET 的使用,它是原始 Akka 的一个相当完整的移植,所以我希望到本文结束时,你将大致了解它是如何工作的。
演示应用代码
你可以从我的 github 帐户获取本文的演示代码: https://github.com/sachabarber/AkkaWintail
Actor 背景闲聊
这并不是我第一次撰写关于 Actor 模型的文章,事实上,前段时间我写了一篇很长的文章(这是一篇很好的背景阅读材料),内容是关于我为 NetMQ(ZeroMQ 的 .NET 移植)编写的 Actor 模型。你可以在这里阅读那篇文章
https://sachabarbs.wordpress.com/2014/09/05/zeromq-7-a-simple-actor-model/
我非常高兴,但这实际上进入了 NetMQ 的实际代码库,并在此处完整记录
http://netmq.readthedocs.org/en/latest/actor/
下一段文字直接摘自我之前撰写的文章/文档,但如果你有兴趣了解特定的 NetMQ 实现如何工作,你仍然应该阅读这些内容。
总之,让我们深入探讨吧……
告诉我更多关于 Actor 的趣事
什么是 Actor 模型?
这是维基百科对 Actor 模型介绍的说法。
计算机科学中的 Actor 模型是一种并发计算的数学模型,它将“Actor”视为并发数字计算的通用原语:为了响应它收到的消息,一个 Actor 可以做出本地决策,创建更多 Actor,发送更多消息,并决定如何响应收到的下一条消息。
….
….
Actor 模型采用万物皆 Actor 的哲学。这与某些面向对象编程语言使用的万物皆对象哲学相似,但不同之处在于面向对象软件通常是顺序执行的,而 Actor 模型本质上是并发的。
一个 Actor 是一个计算实体,为了响应它收到的消息,它可以并发地
- 向其他 Actor 发送有限数量的消息
- 创建有限数量的新 Actor
- 指定用于接收下一条消息的行为。
上述操作没有假定的顺序,它们可以并行执行。
将发送方与发送的通信解耦是 Actor 模型的一项基本进步,它使得异步通信和控制结构成为消息传递模式。
消息的接收者通过地址识别,有时称为“邮寄地址”。因此,Actor 只能与它拥有地址的 Actor 通信。它可以从它收到的消息中获取这些地址,或者如果地址是它自己创建的 Actor 的地址。
Actor 模型的特点是 Actor 内部和 Actor 之间计算的固有并发性、Actor 的动态创建、消息中包含 Actor 地址以及仅通过直接异步消息传递进行交互,且对消息到达顺序没有限制。
http://zh.wikipedia.org/wiki/Actor_model
我喜欢将 Actor 视为可以用来缓解使用共享数据结构时的一些同步问题。这是通过你的应用程序代码通过消息传递/接收与 Actor 通信来实现的。Actor 本身可以将消息传递给其他 Actor,或者处理传递的消息本身。通过使用消息传递而不是共享数据结构,这可能有助于将 Actor(或它发送消息给的任何后续 Actor)视为处理数据的副本而不是处理相同的共享结构。这在某种程度上消除了担心诸如锁和多线程代码可能产生的任何讨厌的时序问题。如果 Actor 使用自己的数据副本,那么我们就不应该有其他线程想要使用 Actor 拥有的数据的问题,因为数据唯一可能存在的地方就是 Actor 本身,除非我们将另一条消息传递给不同的 Actor。但是,如果我们这样做,新消息到另一个 Actor 也将是数据的副本,因此也将是线程安全的。
我希望你明白我试图解释的内容,也许一张图会有所帮助。
使用共享数据结构的多线程应用程序
一个相当常见的事情是运行多个线程以加快速度,但随后你意识到你的线程需要改变一些共享数据结构的状态,因此你必须涉及线程同步原语(最常见的是 lock(..) 语句,以创建你的用户定义的临界区)。这会奏效,但现在由于必须等待锁释放才能运行线程 X 的代码,你正在引入人为延迟。
为了进一步说明这一点,让我们看一些可能进一步说明这一点的代码,想象我们有这种数据结构代表一个非常简单的银行账户
namespace ConsoleApplication1
{
public class Account
{
public Account()
{
}
public Account(int id, string name,
string sortCode, decimal balance)
{
Id = id;
Name = name;
SortCode = sortCode;
Balance = balance;
}
public int Id { get; set; }
public string Name { get; set; }
public string SortCode { get; set; }
public decimal Balance { get; set; }
}
}
没什么花哨的,只是一些字段。那么现在让我们继续看一些线程代码,我选择只显示两个线程作用于一个共享的 Account 实例。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Management.Instrumentation;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication1
{
class Program
{
private object syncLock = new object();
private Account clientBankAccount;
public Program()
{
clientBankAccount = new Account(1,"sacha barber","112233",0);
}
public async Task Run()
{
try
{
var addToAccountTask = Task.Run(() =>
{
Console.WriteLine("Tread Id {0}, Account balance before: {1}",
Thread.CurrentThread.ManagedThreadId, clientBankAccount.Balance);
lock (syncLock)
{
Console.WriteLine("Tread Id {0}, Adding 10 to balance",
Thread.CurrentThread.ManagedThreadId);
clientBankAccount.Balance += 10;
Console.WriteLine("Tread Id {0}, Account balance before: {1}",
Thread.CurrentThread.ManagedThreadId, clientBankAccount.Balance);
}
});
var subtractFromAccountTask = Task.Run(() =>
{
Console.WriteLine("Tread Id {0}, Account balance before: {1}",
Thread.CurrentThread.ManagedThreadId, clientBankAccount.Balance);
lock (syncLock)
{
Console.WriteLine("Tread Id {0}, Subtracting 4 to balance",
Thread.CurrentThread.ManagedThreadId);
clientBankAccount.Balance -= 4;
Console.WriteLine("Tread Id {0}, Account balance before: {1}",
Thread.CurrentThread.ManagedThreadId, clientBankAccount.Balance);
}
});
await Task.WhenAll(addToAccountTask, subtractFromAccountTask);
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
static void Main(string[] args)
{
Program p = new Program();
p.Run().Wait();
Console.ReadLine();
}
}
}
我可能选择了一个你认为在现实生活中可能不会发生的例子,老实说,这个场景在现实生活中可能不会出现,因为谁会做如此愚蠢的事情,在一个线程中贷记账户,在另一个线程中借记账户……我们都是勤奋的开发者,我们不会让这种事情进入代码,对吗?
老实说,无论实际示例是否具有现实意义,其重点仍然相同,因为我们有多个线程访问共享数据结构,对其的访问必须同步,这通常通过 lock(..) 语句完成,如代码所示。
现在不要误会我的意思,上面的代码确实有效,如下面的输出所示
也许还有一种更有趣的方式!
Actor 模型
Actor 模型采用不同的方法,其中使用消息传递,这 **可能** 涉及某种形式的序列化,因为消息通过网络传递,这在某种程度上保证了没有共享结构需要竞争。
那是一个小小的介绍/回顾,那么本文的其余部分将介绍什么呢?本文的其余部分将介绍如何在 .NET 中开发使用 Actor 模型的代码。我们将继续探讨如何使用 Akka.NET,我不会介绍上面示例的 Akka.NET 版本,而是将基于一个新的 Akka.NET 示例进行构建。
Akka.Net
正如已经提到的,Akka.NET 是 Akka 的直接移植,而且是一个相当完整的移植。
培训指南
有一个非常出色的培训指南,其中包含从头到尾的示例,你可以通过它,它被很好地分解成小块的实验,共有 3 部分,每部分约有 6 个实验。你可以从这个网址获取
https://github.com/petabridge/akka-bootcamp
我 **强烈**(看到我用我出色的 <strong>strongly</strong>
html 技能做了什么吗)建议你阅读此内容并尝试一下。
Akka.NET 和演示应用
本文的其余部分将介绍 Akka.NET 内部,并简要介绍演示应用程序。我从 https://github.com/petabridge/akka-bootcamp 培训指南中毫不客气地窃取了已完成的第一部分实验的演示应用程序。这是一个很好的起点,在我看来,我正在分享 Akka.NET 的爱,所以他们会很高兴。
当然,我还会添加一些实验中未涵盖的新内容。
Actor 系统
Akka.NET 是一个用于 .NET 的 Actor 框架。Akka.NET 由几个系统级 Actor 组成(这些 Actor 不受你控制,它们是 Akka.NET 的一部分)。这些顶级 Actor 被称为“守护者”。你从未真正直接处理这些 Actor。相反,你需要在以下 2 种上下文之一中创建 Actor
- 在 Akka.NET Actor 系统的上下文中
- 在现有 Actor 的上下文中,新创建的 Actor 将是创建它的上下文的子 Actor。因此,它将由其父级(即在此上下文中生成此新 Actor 的那个)进行监督
让我们看一个如何在 Actor 系统中创建 Actor 的例子(如果这看起来很奇怪,请暂时不要担心,这里的 Props
东西将在稍后解释)
// make actor system
MyActorSystem = ActorSystem.Create("MyActorSystem", config);
// create top-level actors within the actor system
Props consoleWriterProps = Props.Create<ConsoleWriterActor>();
IActorRef consoleWriterActor = MyActorSystem.ActorOf(consoleWriterProps, "consoleWriterActor");
Props tailCoordinatorProps = Props.Create(() => new TailCoordinatorActor());
IActorRef tailCoordinatorActor = MyActorSystem.ActorOf(tailCoordinatorProps, "tailCoordinatorActor");
// blocks the main thread from exiting until the actor system is shut down
MyActorSystem.AwaitTermination();
可以看出,这里有一个 Akka.NET ActorSystem
,我们用它来创建 Actor,其中我们使用了一些奇怪的 Props
魔法……这些 Actor 将由 Akka.NET 守护者监督。我们稍后会讲到监督。
Props - 创建新 Actor
Props
是你用来创建 Actor 的对象。你 **必须** 使用 Props
在主 ActorSystem
中或在另一个 Actor 的范围内创建 Actor。但是我们如何使用这个 Props
东西呢?
有不同的方法可以使用 Props
创建 Actor,让我们看几个例子。
如果 Actor 本身有一个默认构造函数,我们可以使用这种形式的 Props
Props consoleWriterProps = Props.Create<ConsoleWriterActor>();
但是,如果 Actor 构造函数需要参数,我们可以使用这种形式的 Props
Props someNonDefaultProps = Props.Create(() => new SomeNonDefaultActor(someDependency));
IActorRef someNonDefaultActor= MyActorSystem.ActorOf(someNonDefaultProps , "someNonDefaultActor");
还有另一种方法,你可以向实际的 Actor 类添加一个静态工厂方法,并让它返回 Props
,然后你就可以使用它。
在所有这些情况下,都会返回一个名为 IActorRef
的东西。你问那是什么?很简单,IActorRef
是系统中 Actor 的句柄。如果你有一个 Actor 的 IActorRef
,你可以 Tell
/Ask
该 Actor 事情。你当然可以传递 IActorRef
。
Props
类的另一个值得注意的地方是,它充当一种捕获(通过在 Actor 第一次创建时捕获输入参数的闭包)输入参数的方式,这样 Props
也充当了一个工厂,用于在 Actor 死亡时(无论是通过编程选择还是通过监督策略)重新创建 Actor。
Props - 创建子 Actor
我们已经了解了如何在整个 Akka.NET 系统中创建 Actor,我还暗示了你可以在其他 Actor 的上下文中创建 Actor。那么你如何做到这一点呢?这非常简单,并且它建立在我们刚刚介绍的 Props
概念之上。这是一个例子
public class TailCoordinatorActor : UntypedActor
{
protected override void OnReceive(object message)
{
if (message is StartTail)
{
var msg = message as StartTail;
Context.ActorOf(Props.Create(() => new TailActor(msg.ReporterActor, msg.FilePath)));
}
}
}
重要的行是这一行
Context.ActorOf(Props.Create(() => new TailActor(msg.ReporterActor, msg.FilePath)));
如上所述,当你使用 Context.ActorOf(..)
时,你是在此 Actor 的上下文中创建一个新 Actor。这意味着新创建的 Actor 是当前 Actor 的子 Actor。因此,监督此新子 Actor 成为当前 Actor 的职责。我们接下来将详细讨论这一点。
监督
在 Akka.NET 中,我们现在知道我们可以创建自己的 Actor 层次结构。叶子 Actor(下属)将由它们的父 Actor 监督,依此类推,一直到 Akka.NET 顶层系统 Actor(“守护者”)
当下属检测到故障(即抛出异常)时,它会暂停自身及其所有下属,并向其主管发送一条消息,表示故障。根据要监督的工作的性质和故障的性质,主管有以下四种选择
- 恢复下属,保留其累积的内部状态
- 重新启动下属,清除其累积的内部状态
- 永久停止下属
- 将故障升级到层次结构中的下一个父级,从而使其自身失败
将 Actor 始终视为监督层次结构的一部分非常重要,这解释了第四种选择的存在(因为主管也是更高一级主管的下属),并且对前三种选择有影响:恢复 Actor 会恢复其所有下属,重新启动 Actor 会重新启动其所有下属(但请参阅下文了解更多详细信息),同样,终止 Actor 也会终止其所有下属。应该注意的是,Actor 类的 PreRestart 钩子的默认行为是在重新启动之前终止其所有子级,但此钩子可以被覆盖;递归重新启动适用于此钩子执行后剩余的所有子级。
每个主管都配置了一个函数,将所有可能的故障原因(即异常)转换为上述四种选择之一;
http://getakka.net/docs/concepts/supervision
在 Akka.NET 中,有两种监督策略,如下所示
- OneForOneStrategy:这意味着只有失败的子进程会受到父指令(见上文)的影响
- AllForOneStrategy:这意味着所有子进程都会受到父指令(见上文)的影响
这就是理论,那么这如何转化为代码呢?每个 Actor 都允许你覆盖监督策略(默认是 OneForOneStrategy
),如下所示。我认为下面的代码块是相当不言自明的。
让我们看一个例子。
// here we are overriding the default SupervisorStrategy
// which is a One-For-One strategy w/ a Restart directive
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy (
10, // maxNumberOfRetries
TimeSpan.FromSeconds(30), // duration
x =>
{
//Maybe we consider ArithmeticException to not be application critical
//so we just ignore the error and keep going.
if (x is ArithmeticException) return Directive.Resume;
//Error that we cannot recover from, stop the failing actor
else if (x is NotSupportedException) return Directive.Stop;
//In all other cases, just restart the failing actor
else return Directive.Restart;
});
}
ActorRef
如前所述,IActorRef
是一个对实际 Actor 的抽象句柄。通过使用 IActorRef
,你可以执行 Akka.NET Actor 所需的所有常见操作。
下面显示了可供你使用的方法类型示例
特殊 ActorRefs
有几个特殊的 IActorRef
你应该了解。
Self
:指当前 Actor,仅在 Actor 内部有效Sender
:指消息的来源,仅在 Actor 内部有效
Actor 选择
虽然我强烈建议你始终确保手头有正确的 IActorRef
,以便向其发送消息。在可能很深/复杂的层次结构中,这可能并非总是可行。但不用担心,Akka.NET 已经为你考虑到了这一点。回想一下我曾说过有几个你无法使用的 Akka.NET 级别 Actor,称为“守护者”。嗯,这些家伙构成了一个导航/监督系统。因此,你可以使用他们的 Path
以及你可能创建的任何其他 Actor/子 Actor,这些允许你基于所谓的“**Actor 选择**”字符串获取 IActorRef
。
这是一个例子。
Context.ActorSelection("akka://MyActorSystem/user/validationActor").Tell(message);
该路径(“MyActorSystem
”)的一部分来自你创建整个 Akka.NET Actor 系统的地方,对于演示来说是这样的
MyActorSystem = ActorSystem.Create("MyActorSystem");
下一部分是整个 Akka.NET Actor 系统中特殊守护者路径值(“user
”)之一。从那里,只需指定我们想要的 IActorRef
即可。
你可以在此处阅读有关 Akka.NET 中的路径/选择和守护者的更多信息: http://getakka.net/docs/concepts/addressing
发送消息
好的,我们已经介绍了一些内容,我们现在知道以下内容
- 有一个整体系统的概念,其中包含守护者和我们自己的 Actor
- 有一个监督者的概念(如果没有提供,其中一个守护者将承担这项工作)
- 我们可以使用
IActorRef
来对 Actor 进行操作。
所以,是时候对 Actor 做点什么了,不是吗?那么,让我们开始吧。让我们看看如何处理发送(Tell
)消息并在目标 Actor 中接收该消息。
在 Akka.NET 中发送消息是通过使用 IActorRef.Tell(..)
完成的。一个例子如下
consoleReaderActor.Tell(ConsoleReaderActor.StartCommand);
现在让我们看看接收方如何处理这个问题。
注意:我在这里使用 UntypedActor
,但如果你更喜欢使用 TypedActor
,它允许你在 Actor 的构造函数中使用泛型 Receive<T>(..)
方法(例如,在 Actor 中你可以覆盖一大堆其他生命周期方法,但我不会涵盖这些,我会把它留给读者作为练习),而不是只有一个单一的重写 OnReceive(..)
方法,你需要解码传入的消息。这只是个人喜好问题。
class ConsoleReaderActor : UntypedActor
{
public const string StartCommand = "start";
protected override void OnReceive(object message)
{
if (message.Equals(StartCommand))
{
....
}
}
}
询问
尽管我没有在此处提供的演示应用程序中介绍这一点,但只要你有一个有效的 IActorRef
,你当然可以向它询问一些事情。
这是通过使用 Ask()
方法完成的。
特殊消息类型
在 Akka.NET 中有几种特殊类型的消息。我能想到的两种是
PoisonPill.Instance
向 Actor 发送 Akka.Actor.PoisonPill
将在消息处理时停止 Actor。Akka.Actor.PoisonPill
作为普通消息排队
将在邮箱中已排队的消息之后处理。
Kill.Instance
Actor.Kill
会导致 Actor 在处理消息时抛出 Akka.Actor.ActorKilledException
,该异常通过正常的监督机制进行处理,而 Akka.Actor.IActorContext.Stop
(Akka.Actor.IActorRef
) 会导致 Actor 停止,不再处理任何消息
生命周期方法
Akka.NET 中的每个 Actor 都有你可以介入的特定生命周期事件。因此,你创建的任何 Actor 都可以选择覆盖并使用这些生命周期事件。演示代码不涵盖这一点,但此图应有助于你理解其工作原理。
点击查看大图
你可以在此处阅读更多信息: http://getakka.net/docs/Actor%20lifecycle
日志记录
Akka.NET 对许多不同的日志框架都有良好的支持。现在我倾向于使用 NLog,所以这里我将展示 NLog。你需要遵循几个步骤才能让日志记录工作,我们将在下面详细介绍
步骤 1:获取正确的包
第一步是确保你已安装正确的日志 DLL,我通常通过 NuGet 进行安装。所以我通过 NuGet 安装这个包
- Akka.Logger.NLog
步骤 2:确保 App.Config 正确
然后我们需要确保我们的主 App.Config 日志记录部分是正确的,对于演示来说是这样的
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<configSections>
<section name="nlog" type="NLog.Config.ConfigSectionHandler, NLog"/>
</configSections>
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<targets>
<target name="default" xsi:type="File"
fileName="C:\temp\WinTail.txt"
archiveFileName="C:\temp\WinTailArchives\WinTail-log.{#}.txt"
archiveEvery="Day"
archiveNumbering="Rolling"
maxArchiveFiles="7" />
<target name="errors" xsi:type="File"
fileName="C:\temp\WinTail_Errors.txt"
archiveFileName="C:\temp\WinTailArchives\WinTail_Errors.{#}.txt"
archiveEvery="Day"
archiveNumbering="Rolling"
maxArchiveFiles="7" />
</targets>
<rules>
<logger name="*" writeTo="default" />
<logger name="*" minlevel="Error" writeTo="errors" />
</rules>
</nlog>
</configuration>
步骤 3:创建 Logger 文件
然后我们需要创建一个可以与 Akka.NET 一起使用的 logger,令人惊讶的是它本身也是一个 Actor。现在我们来看看吧
using System;
using Akka.Actor;
using Akka.Event;
using NLog;
using NLogger = global::NLog.Logger;
namespace WinTail
{
/// <summary>
/// This class is used to receive log events and sends them to
/// the configured NLog logger. The following log events are
/// recognized: <see cref="Debug"/>, <see cref="Info"/>,
/// <see cref="Warning"/> and <see cref="Error"/>.
/// </summary>
public class NLogLogger : ReceiveActor
{
private readonly ILoggingAdapter log = Context.GetLogger();
private static void Log(LogEvent logEvent, Action<NLogger> logStatement)
{
var logger = LogManager.GetLogger(logEvent.LogClass.FullName);
logStatement(logger);
}
/// <summary>
/// Initializes a new instance of the <see cref="NLogLogger"/> class.
/// </summary>
public NLogLogger()
{
Receive<Error>(m => Log(m, logger => logger.ErrorException(m.Message.ToString(), m.Cause)));
Receive<Warning>(m => Log(m, logger => logger.Warn(m.Message.ToString())));
Receive<Info>(m => Log(m, logger => logger.Info(m.Message.ToString())));
Receive<Debug>(m => Log(m, logger => logger.Debug(m.Message.ToString())));
Receive<InitializeLogger>(m =>
{
log.Info("NLogLogger started");
Sender.Tell(new LoggerInitialized());
});
}
}
}
步骤 4:包含 Akka.Config (HOCON) 文件
然后我们需要创建一个名为 "akka.config" 的 Akka.NET 配置文件,该文件应设置为 "copy always" 以确保文件存在于输出目录中。该文件看起来像这样
akka { stdout-loglevel = INFO loglevel = INFO log-config-on-start = on loggers = ["WinTail.NLogLogger,AkkaDemo.WinTail"] }
加载方式如下
var config =
ConfigurationFactory.ParseString(
File.ReadAllText(Path.Combine(Environment.CurrentDirectory, "akka.config")));
// make actor system
MyActorSystem = ActorSystem.Create("MyActorSystem", config);
步骤 5:开始记录
现在,所有这些都已就绪,剩下的就是实际记录。我们这样做如下
using System;
using Akka.Actor;
using Akka.Event;
namespace WinTail
{
class ConsoleWriterActor : UntypedActor
{
private readonly ILoggingAdapter log = Context.GetLogger();
protected override void OnReceive(object message)
{
if (message is Messages.InputError)
{
var msg = message as Messages.InputError;
log.Error("Messages.InputError seen : " + msg.Reason);
....
}
else if (message is Messages.InputSuccess)
{
var msg = message as Messages.InputSuccess;
log.Info("Messages.InputSuccess seen : " + msg.Reason);
....
}
else
{
log.Info(message.ToString());
....
}
....
}
}
}
测试
为了测试 Akka.NET,我们必须安装一些额外的库,即
- Akka.TestKit
- Akka.TestKit.NUnit
两者都作为 Nuget 包提供。所以一旦你拥有了它们,我们就可以创建一个简单的测试(我在这里使用 NUnit),测试可能看起来像这样
[TestFixture]
public class TailActorTests : TestKit
{
[Test]
public void Show_NotifyTailActor_When_Change_Is_Made_To_File()
{
string fullFilePath = @"c:\temp\AkkaTest.txt";
FileObserver fileObserver = new FileObserver(TestActor, fullFilePath);
fileObserver.Start();
File.WriteAllText(fullFilePath, "A test message from TailActorTests" +
DateTime.Now.ToString());
ExpectMsg<TailActor.FileWrite>(x => fullFilePath.Contains(x.FileName) );
}
}
这里有几点需要注意,例如
- 我们继承自一个名为“
TestKit
”的特殊基类,这是一个特殊的 Akka.NET 基类,它允许像上面你看到的ExpectMsg<T>(Predicate<T>> pred)
这样的操作 - 有一个特殊的
TestActor
你可以使用 - 正如刚才所说,我们可以使用
ExpectMsg<T>(Predicate<T>> pred)
辅助方法来查看TestActor
是否收到了某些消息
实际演示应用
正如我之前所说,演示应用程序几乎完全照搬了 Akka.NET 培训材料。让我们来看看演示应用程序的功能
- 有一个
ConsoleReaderActor
,它期望用户输入要监视更改的打开文本文件的名称。 - 有一个
ValidationActor
,它验证用户输入的文件夹名是否正确 - 有一个监督
TailCoordinatorActor
,它获取输入文件,并设置一个TailActor
开始监听输入文件的更改 - 有一个
TailActor
,它将监听输入文件中的更改 - 有一个
ConsoleWriterActor
,它会将观察到的任何更改(例如输入到被监视的原始文本文件中的新文本)写入控制台。
运行演示应用程序的步骤
- 在某个地方创建一个文本文件。打开此文本文件
- 运行演示应用程序代码。当提示时,告诉它你创建的文本文件的路径
- 编辑你创建的文本文件,每次编辑后保存,然后观看 Actor 系统演示向你展示更改
就这些
这就是我这次想说的全部内容,希望对你们中的至少一些人有所帮助。一如既往,如果你喜欢你所看到的,请花一分钟投票或留言,我们总是很感激