使用Rx简化并发、计划多值处理和单元测试





5.00/5 (1投票)
在本文中,我们使用了一个涉及观察日内交易的真实场景。
前言
在上一篇系列文章中,我使用了一个WPF应用程序来演示Rx内置的并发抽象能力。这有助于简化MVVM并发场景的单元测试。在本文中,我们使用一个涉及观察日内交易的真实场景。我们将利用这个真实场景更深入地了解Rx的细节,并分析Rx如何用很少的代码简化复杂并发场景的处理,同时提供处理并发、错误和单元测试能力的细粒度灵活性。我们将利用Rx测试框架的灵活性在虚拟时间维度上进行测试。这是一个绝对必要的元素,因为实时数据序列或流式时间序列不可避免地包含一个时间维度。在此过程中,我们将实践Rx的以下功能。
函数式
在适用之处,避免使用复杂的有状态程序,而是使用干净的输入/输出函数处理可观察流。
少即是多
ReactiveX的运算符通常将曾经是一个复杂的挑战简化为几行代码。
异步错误处理
传统的try/catch对异步计算中的错误无能为力,但ReactiveX配备了适当的错误处理机制。
并发变得简单
ReactiveX中的Observables和Schedulers允许程序员抽象底层线程、同步和并发问题。
场景
交易员需要一份日内报告来了解他们当天的电力头寸。报告应将每小时的汇总交易量输出到一个CSV文件。
序号。 | 要求 |
---|---|
1 | 必须使用.Net 4.5 & C#实现为Windows服务。 |
2 | 所有交易头寸必须按小时(本地/实际时间)汇总。请注意,对于给定的日期,该日期的实际本地开始时间是前一天晚上23:00。本地时间是GMT时区。 |
3 | CSV输出格式必须是两列:本地时间(24小时格式HH:MM,例如13:00)和交易量,第一行必须是标题行。 |
4 | CSV文件名必须是PowerPosition_YYYYMMDD_HHMM.csv,其中YYYYMMDD是年/月/日,例如20141220表示2014年12月20日,HHMM是24小时制的小时和分钟,例如1837。日期和时间是提取时的本地时间。 |
5 | CSV文件的位置应存储并从应用程序配置文件读取。 |
6 | 提取必须以计划的时间间隔运行;每隔X分钟,其中实际间隔X存储在应用程序配置文件中。此提取不必精确在分钟内运行,可以在配置间隔的+/- 1分钟范围内。 |
7 | 不允许错过计划的提取。 |
8 | 提取必须在服务首次启动时运行,然后按上述间隔运行。 |
9 | 服务可以只在启动时读取配置,并且在配置文件更改时不动态更新是可以接受的。更新配置时,需要重启服务就足够了。 |
10 | 服务必须提供足够的日志记录,以便生产支持人员诊断任何问题。 |
11 | 这应该处理夏令时开始和结束。 |
其他说明
已提供一个程序集(PowerService.dll),必须使用它来与“交易系统”接口。提供了一个单一接口来检索指定日期的电力交易。提供了两个方法,一个是同步实现(IEnumerable<TradeAdapter>
),另一个是异步实现(Task<IEnumerable<TradeAdapter>>
)。实现必须使用异步方法。PowerService类是该服务的实际实现。日期参数应该是检索电力头寸(交易量)的日期。PowerTrade类包含给定日期的PowerPeriods数组。周期号从1开始,表示一天的第一个周期,并从前一天晚上23:00开始。
示例
给定对PowerService.GetTrades
的调用,返回以下两个交易头寸
T1 => [ P1{1,100} , P2{2,100}, P3{3,100}, P4{4,100} .. P23{23,100},P24{24,100}] T2 => [ P1{1,-50} , P2{2,-20}, P3{3,-50}, P4{4,-50} .. P23{23,100},P24{24,10}]
预期输出
本地时间 | 卷 |
---|---|
23:00 | 50 |
00:00 | 80 |
01:00 | 50 |
02:00 | 50 |
…. | …. |
21:00 | 200 |
22:00 | 110 |
概念分析和设计
我们可以将工作流概念化为一个无限的Timer事件序列Tm,它**转换**为**Observable**的交易集合。
在转换过程中可能会发生异常,如Error所示。在这种情况下,我们需要**Retry**源,以便Timer事件序列继续,而不是因**Error**而中断。这是本场景设计所必需的,因为规范7指出不允许错过计划的提取。
Tm–>{ T1 {Periods[1,n], T2 {Periods[1,m] , … Tn {Periods[1,k] }–>Completed Tm–>{ T1 {Periods[1,n], T2 {Periods[1,m] , … Tn {Periods[1,k]}–>Completed Tm–>{ }–>Error—>{ T1 {Periods[1,n], T2 {Periods[1,m] , … Tn {Periods[1,k] }–>Completed Tm–>{ T1 {Periods[1,n], T2 {Periods[1,m] , … Tn {Periods[1,k]}–>Completed Tm–>{ T1 {Periods[1,n], T2 {Periods[1,m] , … Tn {Periods[1,k]}–>Completed Tm ….
上述所有规范都可以使用上述概念设计在下面约60行格式化的代码中实现。此外,错误处理、调度、多值处理、转换和虚拟时间维度并发单元测试的复杂性可以轻松实现,如下节所述。
public void Run(IPowerService svc, IScheduler scheduler, DateTime dtrunDate, TimeZoneInfo timeZoneInfo,
int observationIntervalInMinutes, StringBuilder sbpowerpositionLines, string csvFilePath, StreamMode streamMode = StreamMode.StreamToFile)
{
var dateTimeHelper = new DateTimeHelper(dtrunDate,timeZoneInfo);
_reporterDisposable = Observable.Interval(TimeSpan.FromMinutes(observationIntervalInMinutes), scheduler)
.Select(i =>
{
dtrunDate = dtrunDate.AddMinutes((i + 1) * observationIntervalInMinutes);
return Observable.FromAsync(() => svc.GetTradesAsync(dtrunDate));
})
.Subscribe(m =>
{
sbpowerpositionLines.Clear();
sbpowerpositionLines.AppendLine("Local Time,Volume");
m.Catch((PowerServiceException ex) =>
{
Log.Error(string.Format("PowerServiceException {0}", ex.Message));
return Observable.FromAsync(() => svc.GetTradesAsync(dtrunDate));
})
.Retry()
.SelectMany(t => t.SelectMany(x => x.Periods))
.GroupBy(g => g.Period)
.Select(p => new {Period = p.Key, Volume = p.Sum(_ => _.Volume)})
.Subscribe(value =>
{
value.Volume.Subscribe(volume =>
{
sbpowerpositionLines.AppendLine(string.Format("{0},{1}",
dateTimeHelper.Reset(dtrunDate, timeZoneInfo).UtcIndexToLocalHourMap[value.Period], volume));
Log.Info(string.Format("Period {0}, Volume {1}",
value.Period,
volume));
});
}, delegate
{
sbpowerpositionLines.AppendLine("OnError");
Log.Error("OnError");
},
async () =>
{
if (streamMode == StreamMode.StreamToMemory) return;
string path = Path.Combine(csvFilePath,
"PowerPosition" + dtrunDate.ToString("_yyyyMMdd_") + DateTime.Now.ToString("HHmm") +
".csv");
if (Directory.Exists(csvFilePath))
{
using (var stream = new StreamWriter(path))
{
await stream.WriteAsync(sbpowerpositionLines.ToString());
await stream.FlushAsync();
}
Log.Info("Completed " + path + "\n");
}
else
{
Log.Error("Completed but Path " + path + " do not exist !!\n");
}
});
});
}
上面的代码可能看起来有点令人生畏。但是,如果您熟悉Rx语法,那么实际上并非如此。
而不是一个复杂的有状态工作流,我们可以**函数式地**看待上述场景,即我们有一个无限的定时器事件**输入**序列,代表计划,它根据上述指定的时间转换成**输出**,并持久化到CSV文件中。其他事项,如调度、异步调用、错误处理,都可以在Rx中开箱即用地获得。
在我们的设计中,我们需要确保工作流符合Rx语法。Rx序列后跟零个或多个OnNext
** **,最后以OnError
**或**OnCompleted
结束。从语法上可以表示为
sequence => {OnNext}{OnError || OnCompleted}
转换
在第一阶段,我们创建了一个可观察的定时器事件序列。我们将观察这些定时器事件以模拟可根据规范6配置的周期性计划。
Observable.Interval(TimeSpan.FromMinutes(observationIntervalInMinutes), scheduler)
接下来,我们希望将定时器计划事件**转换**或**投影**为可观察的交易列表。投影涉及异步调用和相关的异常处理;这并不重要,因为与传统的try/catch(不能处理异步异常)不同,Rx配备了适当的机制来处理异步异常。如下所示,Observable.FromAsync
将整洁地将结果转换为可观察序列。
您还会注意到,时间通过定时器序列间隔乘以计划间隔(以分钟为单位)的偏移量来滚动。
.Select(i =>
{
dtrunDate = dtrunDate.AddMinutes((i + 1) * observationIntervalInMinutes);
return Observable.FromAsync(() => svc.GetTradesAsync(dtrunDate));
})
结果交易会投影到Subscribe中的OnNext
。需要注意的是,转换后的交易集合本身是可观察的,并且来自源“Observable.FromAsync
”的任何异常都可以通过交易可观察对象“m”观察到,该对象是Outer Timer序列Subscribe的输入。异常被拦截,我们可以通过正常返回Observable.Empty
来继续处理下一个序列值。我们改为再次调用“Observable.FromAsync
”,以便在之前的调用出现错误或异常时获取交易。还需要注意的是,我们应用了.Retry()
,它将在前一次错误或异常发生时无限次尝试源以获取成功的交易列表。
m.Catch((PowerServiceException ex) => { Log.Error(string.Format("PowerServiceException {0}", ex.Message)); return Observable.FromAsync(() => svc.GetTradesAsync(dtrunDate)); }) .Retry()
正如您在下面看到的,我们使用.SelectMany
进一步展平投影,然后按单个周期对交易集合序列进行分组,然后对每个周期的交易量进行求和。这非常类似于Linq to Objects,但有一个细微的区别。
使用Rx Linq运算符时,我们处理的是流,而Linq Object则会将整个集合加载到内存中。在这个例子中,这种差异可能不太明显,但是当您处理非常大的数据集时,例如解析一个100GB的文件,这种差异会变得非常明显,其副作用会反映在资源监视器上。
您进一步注意到,我们应用了另一个.Select来将分组的周期投影到周期序列和该周期所有交易量的总和{P,∑v}。
.SelectMany(t => t.SelectMany(x => x.Periods))
.GroupBy(g => g.Period)
.Select(p => new {Period = p.Key, Volume = p.Sum(_ => _.Volume)})
应用Rx Linq运算符和聚合时值得注意的是,一旦应用了聚合;结果也会转换为Observable,因此为了获得聚合的值,我们必须应用另一个嵌套的.Subscribe。如果不这样做,Rx将锁定。这是必要的邪恶吗?不,绝对不是。这就是Rx如何将并行处理分解为细粒度级别,从而使框架能够处理非常大的内存块到流式帧,而传统Linq to Object将在此失败。
但万一您觉得传统的Linq to Object代码使事情看起来更简单,那么下面的代码就在.Retry()的正下方进行Subscribe。
.Retry()
.Subscribe(value =>
{
var powerPeriods = value.SelectMany(t => t.Periods).GroupBy(g => g.Period).Select(
s => new PowerPeriod
{
Period = s.Key,
Volume = s.Sum(_ => _.Volume)
});
foreach (var powerPeriod in powerPeriods)
{
sbpowerpositionLines.AppendLine(string.Format("{0},{1}",
dateTimeHelper.UtcIndexToLocalHourMap[powerPeriod.Period], powerPeriod.Volume));
Log.Info(string.Format("Period {0}, Volume {1}",
dateTimeHelper.UtcIndexToLocalHourMap[powerPeriod.Period],
powerPeriod.Volume));
}
}
您还会注意到,我正在使用DateTimeHelper
将UTC时间索引转换为本地时间。此外,此助手会在日期更改时重置自身,并在午夜自动调整其内部字典。这使我们在夏令时开始或结束时,在时钟前后移动之前有一个小时的时间。
.Subscribe(value =>
{
value.Volume.Subscribe(volume =>
{
sbpowerpositionLines.AppendLine(string.Format("{0},{1}",dateTimeHelper.UtcIndexToLocalHourMap[value.Period], volume));
Log.Info(string.Format("Period {0}, Volume {1}",value.Period,volume));
});
}
下面的异步委托是内部OnComplete
。我引入了一个StreamMode枚举,主要是作为协助单元测试的一种技术。显然,当我进行单元测试时,我不希望任何内容刷新到物理文件,而是测试内部OnNextcomputation
是否已完成并产生预期输出。否则,这是所有序列上的并发操作收敛的逻辑位置,并且Rx保证序列上没有其他操作正在等待。
async () =>
{
if (streamMode == StreamMode.StreamToMemory) return;
string path = Path.Combine(csvFilePath,"PowerPosition" + dtrunDate.ToString("_yyyyMMdd_") + DateTime.Now.ToString("HHmm") + ".csv");
if (Directory.Exists(csvFilePath))
{
using (var stream = new StreamWriter(path))
{
await stream.WriteAsync(sbpowerpositionLines.ToString());
await stream.FlushAsync();
}
Log.Info("Completed " + path + "\n");
}
else
{
Log.Error("Completed but Path " + path + " do not exist !!\n");
}
}
正如规范所建议的,时间被转换为本地GMT时区。显然,这意味着在夏令时开始时,时钟会向前移动1小时,如下所示,时间显示为2:00am而不是1:00am。
同样,在夏令时结束日期,时钟会在凌晨2:00准确地向后移动1小时至1:00am。这也反映在下面的报告中。
|
|
|
单元测试
现在我们将着眼于单元测试的各个方面。尽管一开始看起来很难,不知道如何单元测试嵌套函数,而且还有子级别并发运行。但是一旦你习惯了Rx的测试范例,测试这些复杂的场景就会变得非常容易。
正如您在下面看到的,Rx提供的TestScheduler是我们最重要的工具,它允许我们将时间推进到虚拟维度,从而使我们能够执行基于时间的测试。
[SetUp]
Setup()
{
_testScheduler = new TestScheduler();
_powerService = new Mock<IPowerservice>();
}
聚合测试
在这里,我们测试基本功能,例如给定两个交易,我们有一个预期的输出。正如您在下面看到的,我们已经伪造了GetTradesAsync
以返回一个已完成任务的任务。
接下来,我们插入伪造的powerservice源,并将注入调度程序的计时器推进到虚拟时间维度。这实际上将触发嵌套的基于时间的事件。
我们测试给定两个交易应该产生如下结果
{T1[ {1,20} , {2,30} , {3,40}] , T2[ {1,20} , {2,30} , {3,40}] } => R[{"23:00",40},{"00:00",60},{"01:00",80}]
[Test]
public void Should_Flatten_Trades_And_Aggregate_Periods_Per_Hour_LocalTime()
{
//Arrange
var intradayReporter = new IntraDayReporter();
_powerService.Setup(p => p.GetTradesAsync(It.IsAny()))
.Returns(
Task.FromResult(CreateMockPowerTrades(It.IsAny(), 2,new[]{
new PowerPeriod { Period = 1, Volume = 20 },
new PowerPeriod { Period = 2, Volume = 30 },
new PowerPeriod { Period = 3, Volume = 40 }}
)));
DateTime date = DateTime.ParseExact( "2011/03/28 10:42:33", "yyyy/MM/dd HH:mm:ss", CultureInfo.InvariantCulture);
StringBuilder sb = new StringBuilder();
TimeZoneInfo gmtTimeZoneInfo = TimeZoneInfo.FindSystemTimeZoneById("GMT Standard Time");
const string expected = "Local Time,Volume\r\n23:00,40\r\n00:00,60\r\n01:00,80\r\n";
//Act
intradayReporter.Run1(_powerService.Object, _testScheduler, date, gmtTimeZoneInfo, 1, sb, It.IsAny(), IntraDayReporter.StreamMode.StreamToMemory);
_testScheduler.AdvanceBy(TimeSpan.FromMinutes(1).Ticks);
var actual = sb.ToString();
//Assert
Assert.AreEqual(expected,actual);
}
异常处理测试
我们将进一步看到Rx测试框架结合MoQ框架的惊人能力,它允许我们测试复杂的并发基于时间的流程序列,而这些序列否则将近乎不可能。
您在下面看到的是MOQ的设置,它按顺序返回一个异常,然后是一系列交易,随后是另一组交易。
您会注意到,一旦发生异常,Rx通常会产生OnError,但是我们捕获异常,然后返回立即调用GetTrades
的结果,而不是等待下一个计划。
通过这种方式,我们能够满足要求7,即不允许错过计划的提取。因此,尽管GetTrades
抛出了异常,但我们看到了预期的结果,与随后的GetTrades
调用匹配。
当我们稍后通过下一个计划推进时间时,我们看到了匹配的下一组结果,从而证明了先前的异常并没有终止外部间隔序列。
[Test]
public void Should_RetryOnException_And_Then_Continue_NextSchedule()
{
//Arrange
_retryCount = 0;
var intradayReporter = new IntraDayReporter();
_powerService.SetupSequence(p => p.GetTradesAsync(It.IsAny<datetime>()))
.Throws(new PowerServiceException("Thrown from Unit Test"))
.Returns(
Task.FromResult(CreateMockPowerTrades(It.IsAny<datetime>(), 2, new[]{
new PowerPeriod { Period = 1, Volume = 20 },
new PowerPeriod { Period = 2, Volume = 30 },
new PowerPeriod { Period = 3, Volume = 40 }}
)))
.Returns(
Task.FromResult(CreateMockPowerTrades(It.IsAny<datetime>(), 3, new[]{
new PowerPeriod { Period = 1, Volume = 10 },
new PowerPeriod { Period = 2, Volume = 10 },
new PowerPeriod { Period = 3, Volume = 10 }}
)));
DateTime date = DateTime.ParseExact("2011/03/28 10:42:33", "yyyy/MM/dd HH:mm:ss", CultureInfo.InvariantCulture);
StringBuilder sb = new StringBuilder();
TimeZoneInfo gmtTimeZoneInfo = TimeZoneInfo.FindSystemTimeZoneById("GMT Standard Time");
const string expectedfirst = "Local Time,Volume\r\n23:00,40\r\n00:00,60\r\n01:00,80\r\n";
const string expectedsecond = "Local Time,Volume\r\n23:00,30\r\n00:00,30\r\n01:00,30\r\n";
//Act
intradayReporter.Run1(_powerService.Object, _testScheduler, date, gmtTimeZoneInfo, 1, sb, It.IsAny<string>(), IntraDayReporter.StreamMode.StreamToMemory);
_testScheduler.AdvanceBy(TimeSpan.FromMinutes(1).Ticks);
var actual = sb.ToString();
//Assert
Assert.AreEqual(expectedfirst, actual);
//Act
/*Advance Virtual time to next schedule */
_testScheduler.AdvanceBy(TimeSpan.FromMinutes(1).Ticks);
actual = sb.ToString();
//Assert
Assert.AreEqual(expectedsecond, actual);
}
夏令时开始测试
在此测试中,我们选择了一个日期2015年3月29日,这是夏令时开始。由于BST时区在凌晨1:00时会向前调整1小时;在2015年3月28日晚上11:00至2015年3月29日之间,总共有23小时,而不是正常的24小时。现在结果应以本地时间显示,我们将需要显示第3个时间段的“03:00”。这将表明dateTimeHelper
类正确重置了小时字典。
需要注意的是,DayLightHelper
会尝试重置其内部字典,每当日期在给定时区内进入夏令时时。
[Test]
public void Should_Handle_24HRPreviouDay_to_22HRCurrentDay_LocalTimeOn_DaylightSaving_Start()
{
//Arrange
var intradayReporter = new IntraDayReporter();
_powerService.Setup(p => p.GetTradesAsync(It.IsAny<Datetime>()))
.Returns(
Task.FromResult(CreateMockPowerTrades(It.IsAny<Datetime>(), 2, new[]{
new PowerPeriod { Period = 1, Volume = 10 },
new PowerPeriod { Period = 2, Volume = 10 },
new PowerPeriod { Period = 3, Volume = 10 },
new PowerPeriod { Period = 4, Volume = 10 },
new PowerPeriod { Period = 5, Volume = 10 },
new PowerPeriod { Period = 6, Volume = 10 },
new PowerPeriod { Period = 7, Volume = 10 },
new PowerPeriod { Period = 8, Volume = 10 },
new PowerPeriod { Period = 9, Volume = 10 },
new PowerPeriod { Period = 10, Volume = 10 },
new PowerPeriod { Period = 11, Volume = 10 },
new PowerPeriod { Period = 12, Volume = 10 },
new PowerPeriod { Period = 13, Volume = 10 },
new PowerPeriod { Period = 14, Volume = 10 },
new PowerPeriod { Period = 15, Volume = 10 },
new PowerPeriod { Period = 16, Volume = 10 },
new PowerPeriod { Period = 17, Volume = 10 },
new PowerPeriod { Period = 18, Volume = 10 },
new PowerPeriod { Period = 19, Volume = 10 },
new PowerPeriod { Period = 20, Volume = 10 },
new PowerPeriod { Period = 21, Volume = 10 },
new PowerPeriod { Period = 22, Volume = 10 }}
)));
DateTime date = DateTime.ParseExact("2015/03/29", "yyyy/MM/dd", CultureInfo.InvariantCulture);
StringBuilder sb = new StringBuilder();
TimeZoneInfo gmtTimeZoneInfo = TimeZoneInfo.FindSystemTimeZoneById("GMT Standard Time");
const string expected = "Local Time,Volume\r\n23:00,20\r\n00:00,20\r\n02:00,20\r\n03:00,20\r\n04:00,20\r\n05:00,20\r\n06:00,20\r\n07:00,20\r\n08:00,20\r\n09:00,20\r\n10:00,20\r\n11:00,20\r\n12:00,20\r\n13:00,20\r\n14:00,20\r\n15:00,20\r\n16:00,20\r\n17:00,20\r\n18:00,20\r\n19:00,20\r\n20:00,20\r\n21:00,20\r\n";
//Act
intradayReporter.Run1(_powerService.Object, _testScheduler, date, gmtTimeZoneInfo, 1, sb, It.IsAny<string>(), IntraDayReporter.StreamMode.StreamToMemory);
_testScheduler.AdvanceBy(TimeSpan.FromMinutes(1).Ticks);
var actual = sb.ToString();
//Assert
Assert.AreEqual(expected, actual);
}
夏令时结束测试
同样,我们检查场景,以便在夏令时结束时,准确地在凌晨2:00,时钟会向后回拨一小时变成凌晨1:00。显然,在上一天23:00到当前一天23:00之间,我们将有25个UTC小时。这可以像上面一样进行测试。
[Test]
public void Should_Handle_24HRPreviouDay_to_25HRCurrentDay_LocalTimeOn_DaylightSaving_End()
{
//Arrange
var intradayReporter = new IntraDayReporter();
_powerService.Setup(p => p.GetTradesAsync(It.IsAny<DateTime>()))
.Returns(
Task.FromResult(CreateMockPowerTrades(It.IsAny<DateTime>(), 2, new[]{
new PowerPeriod { Period = 1, Volume = 10 },
new PowerPeriod { Period = 2, Volume = 10 },
new PowerPeriod { Period = 3, Volume = 10 },
new PowerPeriod { Period = 4, Volume = 10 },
new PowerPeriod { Period = 5, Volume = 10 },
new PowerPeriod { Period = 6, Volume = 10 },
new PowerPeriod { Period = 7, Volume = 10 },
new PowerPeriod { Period = 8, Volume = 10 },
new PowerPeriod { Period = 9, Volume = 10 },
new PowerPeriod { Period = 10, Volume = 10 },
new PowerPeriod { Period = 11, Volume = 10 },
new PowerPeriod { Period = 12, Volume = 10 },
new PowerPeriod { Period = 13, Volume = 10 },
new PowerPeriod { Period = 14, Volume = 10 },
new PowerPeriod { Period = 15, Volume = 10 },
new PowerPeriod { Period = 16, Volume = 10 },
new PowerPeriod { Period = 17, Volume = 10 },
new PowerPeriod { Period = 18, Volume = 10 },
new PowerPeriod { Period = 19, Volume = 10 },
new PowerPeriod { Period = 20, Volume = 10 },
new PowerPeriod { Period = 21, Volume = 10 },
new PowerPeriod { Period = 22, Volume = 10 },
new PowerPeriod { Period = 23, Volume = 10 },
new PowerPeriod { Period = 24, Volume = 10 },
new PowerPeriod { Period = 25, Volume = 10 }}
)));
DateTime date = DateTime.ParseExact("2015/10/25", "yyyy/MM/dd", CultureInfo.InvariantCulture);
StringBuilder sb = new StringBuilder();
TimeZoneInfo gmtTimeZoneInfo = TimeZoneInfo.FindSystemTimeZoneById("GMT Standard Time");
const string expected =
"Local Time,Volume\r\n23:00,20\r\n00:00,20\r\n01:00,20\r\n01:00,20\r\n02:00,20\r\n03:00,20\r\n04:00,20\r\n05:00,20\r\n06:00,20\r\n07:00,20\r\n08:00,20\r\n09:00,20\r\n10:00,20\r\n11:00,20\r\n12:00,20\r\n13:00,20\r\n14:00,20\r\n15:00,20\r\n16:00,20\r\n17:00,20\r\n18:00,20\r\n19:00,20\r\n20:00,20\r\n21:00,20\r\n22:00,20\r\n";
//Act
intradayReporter.Run(_powerService.Object, _testScheduler, date, gmtTimeZoneInfo, 1, sb, It.IsAny<String>(), IntraDayReporter.StreamMode.StreamToMemory);
_testScheduler.AdvanceBy(TimeSpan.FromMinutes(1).Ticks);
var actual = sb.ToString();
//Assert
Assert.AreEqual(expected, actual);
}
DateTimeHelper内部字典重置测试
规范要求结果以本地时间显示。显然,由于我们将获得UTC的小时偏移量,我们需要一种机制将UTC时间偏移量转换为有效的本地时间。此外,这一功能可以根据SOLID设计原则(关注点分离)与上述核心功能分开。您会注意到,每个定时器事件都会偏移该进程开始的时间。偏移的时间输入到PowerService,以便它可以获取特定日期的正确交易。因此,为了测试的目的,有两个时间对我们很重要。它们是夏令时开始时的凌晨1:00和夏令时结束时的凌晨2:00。在这些时间,本地时间会改变,因此为了提供正确的本地时间,内部字典必须改变,这通过.Reset()扩展方法在DayLightHelper
实例中实现。
[Test]
public void DateTimeHelperDayRollOver1AM_OnDaylightSavingStart()
{
//Arrange
var dateStart = DateTime.ParseExact("2015/03/28 23:30", "yyyy/MM/dd HH:mm", CultureInfo.InvariantCulture);
var dateRollOverPre1am = DateTime.ParseExact("2015/03/29", "yyyy/MM/dd", CultureInfo.InvariantCulture);
var dateRollOverpsot1am = DateTime.ParseExact("2015/03/29 01:00", "yyyy/MM/dd HH:mm", CultureInfo.InvariantCulture);
var dateTimeHelper = new DateTimeHelper(dateStart, TimeZoneInfo.FindSystemTimeZoneById("GMT Standard Time"));
var expectedReset = new Dictionary<int, string>
{
{1, "23:00"},{2,"00:00"},{3, "01:00"},{4,"02:00"},{5, "03:00"},{6,"04:00"},
{7, "05:00"},{8,"06:00"},{9, "07:00"},{10,"08:00"},{11, "09:00"},{12,"10:00"},
{13,"11:00"},{14,"12:00"},{15, "13:00"},{16,"14:00"},{17, "15:00"},{18,"16:00"},
{19,"17:00"},{20,"18:00"},{21, "19:00"},{22,"20:00"},{23, "21:00"},{24,"22:00"}
};
var expectedReset1 = new Dictionary<int, string>
{
{1, "23:00"},{2,"00:00"},{3, "02:00"},{4,"03:00"},{5, "04:00"},{6,"05:00"},
{7, "06:00"},{8,"07:00"},{9, "08:00"},{10,"09:00"},{11, "10:00"},{12,"11:00"},
{13,"12:00"},{14,"13:00"},{15, "14:00"},{16,"15:00"},{17, "16:00"},{18,"17:00"},
{19,"18:00"},{20,"19:00"},{21, "20:00"},{22,"21:00"},{23, "22:00"}
};
//Act
var dateTimeHelperReset = dateTimeHelper.Reset(dateRollOverPre1am,
TimeZoneInfo.FindSystemTimeZoneById("GMT Standard Time"));
//Assert
CollectionAssert.AreEquivalent(expectedReset,dateTimeHelperReset.UtcIndexToLocalHourMap);
var dateTimeHelperReset1 = dateTimeHelperReset.Reset(dateRollOverpsot1am,
TimeZoneInfo.FindSystemTimeZoneById("GMT Standard Time"));
//Assert
CollectionAssert.AreEquivalent(expectedReset1, dateTimeHelperReset1.UtcIndexToLocalHourMap);
}
[Test]
public void DateTimeHelperDayRollOver2AM_OnDaylightSavingEnd()
{
//Arrange
var dateStart = DateTime.ParseExact("2015/10/24 23:30", "yyyy/MM/dd HH:mm", CultureInfo.InvariantCulture);
var dateRollOverPre2am = DateTime.ParseExact("2015/10/25", "yyyy/MM/dd", CultureInfo.InvariantCulture);
var dateRollOverpsot2am = DateTime.ParseExact("2015/10/25 02:00", "yyyy/MM/dd HH:mm", CultureInfo.InvariantCulture);
var dateTimeHelper = new DateTimeHelper(dateStart, TimeZoneInfo.FindSystemTimeZoneById("GMT Standard Time"));
var expectedReset = new Dictionary<int, string>
{
{1, "23:00"},{2,"00:00"},{3, "01:00"},{4,"02:00"},{5, "03:00"},{6,"04:00"},
{7, "05:00"},{8,"06:00"},{9, "07:00"},{10,"08:00"},{11, "09:00"},{12,"10:00"},
{13,"11:00"},{14,"12:00"},{15, "13:00"},{16,"14:00"},{17, "15:00"},{18,"16:00"},
{19,"17:00"},{20,"18:00"},{21, "19:00"},{22,"20:00"},{23, "21:00"},{24,"22:00"}
};
var expectedReset1 = new Dictionary<int, string>
{
{1, "23:00"},{2,"00:00"},{3, "01:00"},{4,"01:00"},{5, "02:00"},{6,"03:00"},
{7, "04:00"},{8,"05:00"},{9, "06:00"},{10,"07:00"},{11, "08:00"},{12,"09:00"},
{13,"10:00"},{14,"11:00"},{15, "12:00"},{16,"13:00"},{17, "14:00"},{18,"15:00"},
{19,"16:00"},{20,"17:00"},{21, "18:00"},{22,"19:00"},{23, "20:00"},{24,"21:00"},
{25,"22:00"}
};
//Act
var dateTimeHelperReset = dateTimeHelper.Reset(dateRollOverPre2am,
TimeZoneInfo.FindSystemTimeZoneById("GMT Standard Time"));
//Assert
CollectionAssert.AreEquivalent(expectedReset, dateTimeHelperReset.UtcIndexToLocalHourMap);
var dateTimeHelperReset1 = dateTimeHelperReset.Reset(dateRollOverpsot2am,
TimeZoneInfo.FindSystemTimeZoneById("GMT Standard Time"));
//Assert
CollectionAssert.AreEquivalent(expectedReset1, dateTimeHelperReset1.UtcIndexToLocalHourMap);
}
测试结果如下
安装
Windows服务的安装非常简单,可以如下进行:
installutil /LogFile=svcinstalllog.txt AlanAamy.Net.RxTimedWindow.Service.exe
基于控制台的调试
服务可以调整为在控制台模式下运行,这非常简单,可以进行常规的集成测试,一旦所有单元测试都完成后。您可以使用-console命令行参数在控制台模式下调试同一服务。我将留给您在代码中探索。但是,结果可以通过配置Log4net使其刷新到控制台来查看,正如您在下面看到的。
性能
内存使用图显示私有内存没有出现阶梯。这清楚地表明没有内存泄漏,尽管下面的快照窗口较小。在多核机器上,CPU使用率将显示负载在CPU上的均匀分布,这显然消除了CPU亲和性问题,而传统线程编程如果没有编写大量额外代码,将无法处理这些问题。
从源代码构建
- 在控制台中,转到您的本地git存储库目录或任何目录(git init)。
- 克隆存储库。
git clone https://github.com/arupalan/RxTimedWindow.git
- 转到源目录,更新子模块并构建。
cd AlanAamy.Net.RxTimedWindow/ git submodule update --init --recursive msbuild