使用 Newtonsoft.Json 和 System.Text.Json 与 C# 和 VB 反序列化 Json 流






4.92/5 (7投票s)
如何反序列化非常大的简单和复杂 JSON 流 (.NET 6.0 和 7.0)
Dot Net 6.0
Dot Net 7.0
注意:两个下载都包含相同的文件,只是使用了不同的压缩方法。下载后,请参阅 入门 部分以设置要使用的数据。
使用 JSON 系列
- 第 1 部分:在 C# 和 VB 中使用 Newtonsoft.Json
- 第 2 部分:在 C# 中使用 System.Text.Json
- 第 3 部分:在 C# 中使用 Newtonsoft.Json 和 System.Text.Json 反序列化 Json 流
目录
- 引言
- 什么是流?
- 为什么要使用流式传输?
- VB.NET 限制 (System.Text.Json)
- 使用的代码和数据
- 入门
- 第 1 部分:使用流
- 第 2 部分:自定义 Utf8JsonAsyncStreamReader
- 第 3 部分:简化大型 JSON 对象流工作的库
- 第 4 部分:单元测试
- 第 5 部分:基准测试
- 摘要
- 历史
引言
这是本系列文章的第三部分,也是最后一部分。我们将介绍如何使用 NewtonSoft.Json
和 System.Text.Json
反序列化 JSON 流。用于流式传输的数据可以来自 Web、文件或其他来源。我们还将研究如何从 zip 文件中反序列化 JSON 文件。最后,我们将这些过程封装到库基类中,以简化所需的代码、单元测试和性能监控的基准测试。
什么是流?
Microsoft 对流的解释
引用流是字节序列的抽象,例如文件、输入/输出设备、进程间通信管道或 TCP/IP 套接字。Stream 类及其派生类提供了这些不同类型输入和输出的通用视图,并使程序员免于操作系统和底层设备的具体细节。
为什么要使用流式传输?
原因有很多。使用流的主要优点是
- 我们不必将整个数据(例如 JSON 对象)加载到内存中。它更节省内存,因此可以提高整体性能
处理大型 JSON 数据有什么好处?
- 在对象反序列化时处理它们,无需将所有对象保留在内存中
- 如果 JSON 数据存在问题,我们可以快速失败
- 可以在流式传输中途取消
例如,本文中使用的一个示例 JSON 数据文件大约为 890MB。当加载到内存中并作为单个对象反序列化时,它会消耗超过 6GB 的内存!当作为流反序列化时,每个对象小于 60MB。
VB.NET 限制 (System.Text.Json)
如前一篇文章 在 C# 中使用 System.Text.Json 中所述,VB.NET 不支持 Ref Struct
。Utf8JsonAsyncStreamReader
只能用 C# 编写。
其他例外是用于测试远程数据的 Web API 支持的 JsonSamples.Host
项目和用于构建 JsonSamples
文件夹的 GetDataFiles
。
除了此限制之外,本文将包含 C# 和 VB 的代码 + 包含 C#(79 个项目)和 VB(79 个项目)的解决方案。
使用的代码和数据
本文有很多部分。我编写了这篇文章,以便您可以选择所需的信息部分。有大量的小型目标项目涵盖了所需的代码,而不是集中在一个花哨的 UI 单体应用程序中。我相信这有助于您理解代码并供您自己使用。
下载中包含的所有代码都涵盖了本地(文件系统)和远程(Web API)流式传输,包括 C# 和 VB,用于 Newtonsoft
和 System.Text.Json
。C# 和 VB 的基准测试和单元测试都包含在内。
使用的示例数据是使用 Mockaroo 构建的,或者是来自 eBay 的沙盒数据,而不是实时数据。使用的数据文件大约为 900MB。
用于 System.Text.Json
的自定义 Utf8JsonAsyncStreamReader
JSON 读取器已经过彻底测试,并已准备好投入生产。
注意:这不是 Newtonsoft
JsonReader
的精确替代品,因为它仅用于异步流。
流支持同步和异步 API。本文的重点是多任务/后台任务操作,因此将专门针对异步技术。如果您不熟悉 TPL / Async & Await,请阅读 使用 async 和 await 的异步编程 - Microsoft Learn。
C# 和 VB 的解决方案结构如下
Prototypes
- 所需的最少代码。库是基于这些原型中的代码构建的。Applications
-文件系统
和Web API
(.Remote
) 的 DI 和非 DI 示例Libraries
-Newtonsoft
和System.Text.Json
流 API 的包装器 + 支持类和扩展Unit Tests
- 用于NewtonSoft
和System.Text.Json
通用包装器库、自定义Utf8JsonAsyncStreamReader
和支持辅助方法Benchmarks
衡量默认Newtonsoft
和System.Text.Json
方法 + 包装器库的性能
Newtonsoft
和 System.Text.Json
都包含 DI(依赖注入)和非 DI 示例项目。此外,每种类型都有 文件系统
和 Web API
版本。有多个项目组成每个应用程序,并分割成单独的类库。使用的应用程序结构如下
Application
|
+-- <type>.<JsonLib>
| |
| + -- Common.Ebay
| | |
| | +-- Common
| |
| +--- Common.<JsonLib>
| | |
| | +-- Common
| |
| +--- Ebay.Resources (.zip)
|
+-------- <Application>.Shared
其中
Common
包含所有项目通用的代码Common.<JsonLib>
专门用于使用的<JsonLib>
-Newtonsoft
或System.Text.Json
。包含通用流反序列化程序处理程序代码和自定义读取器 + 扩展方法Common.Ebay
配置/文件列表Ebay.Resources
JSON 数据文件;带有压缩文件的.zip
Ebay.<JsonLib>
包含类型化的流反序列化程序处理程序和为特定<JsonLib>
连接的复杂对象模型Application
用于管理处理的核心
注意:解决方案中有许多项目,每种语言大约 79 个,一半用于 Newtonsoft.Json
,一半用于 System.Text.Json
。此外,还有大型数据文件。编译完整的解决方案将占用大量磁盘空间。因此,强烈建议编译您要运行的示例应用程序。
由于 CSharp 和 VB 都有许多项目,因此每个项目的 *obj* 和 *bin* 文件夹都被移动到根解决方案文件夹的合并 *obj* 和 *bin* 文件夹中,以便于管理。如果您希望我写一篇关于如何做到这一点的提示,请在下面留言。查看 *.csproj* 和 *.vbproj* 以了解我是如何实现这一点的。
定义
我们可以使用两种类型的大型 JSON 文件。我将它们定义如下
- 简单 JSON 集合对象
引用
根 JSON 集合对象中包含相同类型的对象集合。
- 复杂 JSON 对象
引用
一个 JSON 对象,其中包含一个或多个属性,其中一个属性是对象和单个属性的集合。要反序列化的属性和/或集合不必位于 JSON 对象的根中。
入门
下载并解压项目解决方案后,您需要运行 JsonSamples.Host
Web 项目。这既用于 Prototype
/Application
项目,也用于 Setup
项目。一旦 JsonSamples.Host
Web 项目(DeserializingJsonStreams.Hosting - CSharp.sln
解决方案)运行,运行 Setup
(GetDataFiles
) 项目以构建所需的 JsonSamples
文件夹。JsonSamples.Host
Web 项目将生成所需的 zip 文件并复制所需的 JSON 示例文件。
注意:如果此过程失败,可能是因为 JsonSamples.Host
Web 项目托管端口地址已更改。如果是这种情况,请进入 *Config* 文件夹并更新 *appsettings.json* 文件中的 host
属性。此文件由项目中/解决方案中所有需要远程访问 Web API 服务器的应用程序使用。
{
"Host" : "localhost:7215"
}
第 1 部分:使用流
流实现 IDisposable
接口。因此,我们需要确保释放资源以避免内存泄漏。
使用文件
await using (Stream stream = File.OpenRead(filename))
{
// do work here...
}
Using stream As Stream = File.OpenRead(filename)
' do work here...
End Using
使用 Web API
await using (Stream httpStream = await new HttpClient().GetStreamAsync(this._url)
.ConfigureAwait(false))
{
// do work here...
}
Using stream = Await New HttpClient().GetStreamAsync(_url).ConfigureAwait(false)
' do work here...
End Using
使用 Newtonsoft 进行流式传输
使用 Newtonsoft
处理流非常简单。这是设置使用 FileStream
的示例片段
using TextReader textReader = new StreamReader(stream);
using JsonReader jsonReader = new JsonTextReader(textReader);
// pass the reader and file stream
await using (stream.ConfigureAwait(false))
{
// do work here...
}
' set up the stream readers
Using textReader As TextReader = New StreamReader(stream)
Using jsonReader As JsonReader = New JsonTextReader(textReader)
' do work here...
End Using
End Using
简单的 JSON 集合对象反序列化
典型的简单 JSON 集合是数组中的对象列表
[
{
"id":1,
"first_name":"Osbert",
"last_name":"Petcher"
},
{
"id":2,
"first_name":"Salvador",
"last_name":"Marmion"
},
{
"id":3,
"first_name":"Kellen",
"last_name":"Philbin"
},
{
"id":4,
"first_name":"Fred",
"last_name":"Thuillier"
}
]
由于这是一个对象集合,内置的 JsonSerializer
将一次处理一个对象。
以下是使用 Newtonsoft.Json
将上述 JSON 反序列化为 FileStream
的代码
using Newtonsoft.Json;
this._jsonSerializerSettings = new();
this._serializer = JsonSerializer.Create(this._jsonSerializerSettings);
// open the _file as a stream
await using Stream stream = File.OpenRead(filename);
// set up the stream readers
using TextReader textReader = new StreamReader(stream);
using JsonReader jsonReader = new JsonTextReader(textReader);
await using (stream.ConfigureAwait(false))
{
// move to the start of the array and read the stream
await jsonReader.ReadAsync().ConfigureAwait(false);
while (await jsonReader.ReadAsync().ConfigureAwait(false) &&
jsonReader.TokenType != JsonToken.EndArray)
{
Contact? contact = this._serializer!.Deserialize<Contact>(jsonReader);
Process(contact!);
}
}
Imports Newtonsoft.Json
_jsonSerializerSettings = New JsonSerializerSettings()
_serializer = JsonSerializer.Create(_jsonSerializerSettings)
' open the _file as a stream
Using stream As Stream = File.OpenRead(filename)
' set up the stream readers
Using textReader As TextReader = New StreamReader(stream)
Using jsonReader As JsonReader = New JsonTextReader(textReader)
' move to the start of the array and read the stream
Await jsonReader.ReadAsync().ConfigureAwait(False)
While Await jsonReader.ReadAsync().ConfigureAwait(False) AndAlso
jsonReader.TokenType <> JsonToken.EndArray
Dim contact = _serializer.Deserialize(Of Contact)(jsonReader)
Process(contact)
End While
End Using
End Using
End Using
以及处理过程
private void Process(Contact? item)
{
// process and store the data model
this._count++;
}
Private Sub Process(item As Contact)
' process and store the data model
_count += 1
End Sub
文件流示例
将所有内容组合在一起,我们最终得到如下所示的内容
using Common.Helpers;
using Contacts.NewtonSoft.Json.Models;
using Newtonsoft.Json;
internal class Program
{
#region Fields
private readonly IFilePathHelper _fileHelper = new FilePathHelper("Resources");
private JsonSerializer? _serializer;
private JsonSerializerSettings? _jsonSerializerSettings;
private int _count;
private readonly string _file = "Mock_Contacts1.json";
#endregion
#region Methods
private static async Task Main()
=> await new Program().Execute().ConfigureAwait(false);
private async Task Execute()
{
Console.WriteLine($"Reading {this._file}");
this._jsonSerializerSettings = new();
this._serializer = JsonSerializer.Create(this._jsonSerializerSettings);
// open the _file as a stream
await using FileStream stream =
File.OpenRead(this._fileHelper.Resolve(this._file));
Console.WriteLine($"Processing: {this._file}");
// set up the stream readers
using TextReader textReader = new StreamReader(stream);
using JsonReader jsonReader = new JsonTextReader(textReader);
await using (stream.ConfigureAwait(false))
{
// move to start of first object
await jsonReader.ReadAsync().ConfigureAwait(false);
while (await jsonReader.ReadAsync().ConfigureAwait(false) &&
jsonReader.TokenType != JsonToken.EndArray)
{
Contact? contact = this._serializer!.Deserialize<Contact>(jsonReader);
Process(contact!);
}
}
// report results
Console.WriteLine($"Contacts: {this._count:N0}");
Console.WriteLine("Finished");
}
private void Process(Contact? item)
{
// process and store the data model
this._count++;
}
#endregion
}
Imports System.IO
Imports Common.Helpers
Imports Newtonsoft.Json
Imports NewtonsoftContact.Models
Module Program
#Region "Fields"
Private ReadOnly _fileHelper As FilePathHelper = New FilePathHelper("Resources")
Private _serializer As JsonSerializer
Private _jsonSerializerSettings As JsonSerializerSettings
Private _count As Integer
Private ReadOnly _file As String = "Mock_Contacts1.json"
#End Region
#Region "Methods"
Sub Main(args As String())
Console.WriteLine($"Reading {_file}")
ExecuteAsync(args).GetAwaiter.GetResult()
Console.WriteLine("Finished")
End Sub
Private Async Function ExecuteAsync(args As String()) As Task
_jsonSerializerSettings = New JsonSerializerSettings()
_serializer = JsonSerializer.Create(_jsonSerializerSettings)
' open the _file as a stream
Using stream As Stream = File.OpenRead(_fileHelper.Resolve(_file))
' set up the stream readers
Using textReader As TextReader = New StreamReader(stream)
Using jsonReader As JsonReader = New JsonTextReader(textReader)
' move to the start of the array and read the stream
Await jsonReader.ReadAsync().ConfigureAwait(False)
While Await jsonReader.ReadAsync().ConfigureAwait(False) AndAlso
jsonReader.TokenType <> JsonToken.EndArray
Dim contact = _serializer.Deserialize(Of Contact)(jsonReader)
Process(contact)
End While
End Using
End Using
End Using
' report results
Console.WriteLine($"Contacts: {_count:N0}")
End Function
Private Sub Process(item As Contact)
' process and store the data model
_count += 1
End Sub
#End Region
End Module
注意:要查看代码运行,请参阅 prototype \ local \ SimpleData \ NewtonsoftContact
VB/C# 项目。
Web API 示例
Web API 版本几乎相同
using Common.Settings;
using Contacts.NewtonSoft.Json.Models;
using Newtonsoft.Json;
internal class Program
{
#region Fields
private JsonSerializer? _serializer;
private JsonSerializerSettings? _jsonSerializerSettings;
private int _count;
private string _url = "https://{0}/download/MOCK1";
#endregion
#region Methods
private static async Task Main()
=> await new Program().Execute().ConfigureAwait(false);
private async Task Execute()
{
// point to the correct host URL in appsettings.json file
this._url = this._url.Build();
// initialize the serializer
this._jsonSerializerSettings = new();
this._serializer = JsonSerializer.Create(this._jsonSerializerSettings);
// open the stream
Console.WriteLine($"Connecting to stream: {this._url}");
Stream? stream;
try
{
stream = await new HttpClient().GetStreamAsync(this._url);
}
catch (Exception)
{
Console.WriteLine($"Failed to open stream {this._url}.
Please check that the remote server is active.");
return;
}
if (stream is null)
{
Console.WriteLine($"Failed to open stream {this._url}");
return;
}
Console.WriteLine($"Processing: {this._url}");
// set up the stream readers
using TextReader textReader = new StreamReader(stream);
using JsonReader jsonReader = new JsonTextReader(textReader);
await using (stream.ConfigureAwait(false))
{
// move to start of first object
await jsonReader.ReadAsync().ConfigureAwait(false);
while (await jsonReader.ReadAsync().ConfigureAwait(false) &&
jsonReader.TokenType != JsonToken.EndArray)
{
Contact? contact = this._serializer!.Deserialize<contact>(jsonReader);
Process(contact!);
}
}
// manually clean up connection
stream.Close();
await stream.DisposeAsync().ConfigureAwait(false);
// report results
Console.WriteLine($"Contacts: {this._count:N0}");
Console.WriteLine("Finished");
}
private void Process(Contact? item)
{
// process and store the data model
this._count++;
}
#endregion
}
Imports System.IO
Imports System.Net.Http
Imports Common.Settings
Imports Newtonsoft.Json
Imports NewtonsoftContact.Remote.Models
Module Program
#Region "Fields"
Private _serializer As JsonSerializer
Private _jsonSerializerSettings As JsonSerializerSettings
Private _count As Integer
Private _url As String = "https://{0}/download/MOCK1"
#End Region
#Region "Methods"
Sub Main(args As String())
ExecuteAsync(args).GetAwaiter.GetResult()
Console.WriteLine("Finished")
End Sub
Private Async Function ExecuteAsync(args As String()) As Task
' point to the correct host URL in appsettings.json file
_url = _url.Build()
_jsonSerializerSettings = New JsonSerializerSettings()
_serializer = JsonSerializer.Create(_jsonSerializerSettings)
' open the stream
Console.WriteLine($"Connecting to stream: {_url}")
Dim stream As Stream
Try
stream = Await New HttpClient().GetStreamAsync(_url)
Catch ex As Exception
Console.WriteLine($"Failed to open stream {_url}.
Please check that the remote server is active.")
Return
End Try
If stream Is Nothing Then
Console.WriteLine($"Failed to open stream {_url}")
Return
End If
Console.WriteLine($"Processing: {_url}")
' set up the stream readers
Using textReader As TextReader = New StreamReader(stream)
Using jsonReader As JsonReader = New JsonTextReader(textReader)
' move to the start of the array and read the stream
Await jsonReader.ReadAsync().ConfigureAwait(False)
While Await jsonReader.ReadAsync().ConfigureAwait(False) AndAlso
jsonReader.TokenType <> JsonToken.EndArray
Dim contact = _serializer.Deserialize(Of Contact)(jsonReader)
Process(contact)
End While
End Using
End Using
' manually clean up connection
stream.Close()
Await stream.DisposeAsync().ConfigureAwait(False)
' report results
Console.WriteLine($"Contacts: {_count:N0}")
End Function
Private Sub Process(item As Contact)
' process and store the data model
_count += 1
End Sub
#End Region
End Module
注意:要查看代码运行,请参阅 prototype \ remote \ SimpleData \ NewtonsoftContact
VB/C# 项目。
具有选择性反序列化的复杂 JSON 对象
复杂 JSON 由单个简单属性、对象和集合组成。
下面是复杂 JSON 数据结构的示例
{
"categoryTreeId": "123",
"categoryTreeVersion": "1.234a",
"categoryAspects": [
{
"category": {
"categoryId": "111",
"categoryName": "Category 1"
},
"aspects": [
{
"localizedAspectName": "1:Aspect 1"
},
{
"localizedAspectName": "1:Aspect 2"
},
{
"localizedAspectName": "1:Aspect 3"
}
]
},
{
"category": {
"categoryId": "222",
"categoryName": "Category 2"
},
"aspects": [
{
"localizedAspectName": "2:Aspect 1"
},
{
"localizedAspectName": "2:Aspect 2"
},
{
"localizedAspectName": "2:Aspect 3"
}
]
}
]
}
我们只对 "categoryAspects"
集合感兴趣。这里不能使用简单 JSON 集合中使用的方法。如果我们这样做,整个对象将被加载到内存中,而不是每个 CategoryAspect
对象属性。
为了帮助理解下面的代码,这里是手动遍历结构以反序列化每个 CategoryAspect
对象的定义
- 检查每个属性
- 当我们找到
"categoryAspects"
属性时,我们就可以提取每个CategoryAspect
对象- 找到对象的开始
- 遍历并存储对象图,直到到达对象的末尾
- 反序列化对象图
- 重复直到到达数组对象的末尾
以下是使用 Newtonsoft.Json
将上述 JSON 反序列化为 FileStream
的代码
using Newtonsoft.Json;
JsonSerializer _serializer = new();
// open the _file as a stream
await using FileStream stream = File.OpenRead(_filename);
// set up the stream readers
using TextReader textReader = new StreamReader(stream);
using JsonReader jsonReader = new JsonTextReader(textReader);
await using (stream.ConfigureAwait(false))
{
// move to the start of the array and read the stream
await jsonReader.ReadAsync().ConfigureAwait(false);
// walk the collection of objects to the end of the collection
while (await jsonReader.ReadAsync().ConfigureAwait(false) &&
jsonReader.TokenType != JsonToken.EndArray)
await ProcessAsync(jsonReader).ConfigureAwait(false);
}
Imports Newtonsoft.Json
_jsonSerializerSettings = New JsonSerializerSettings()
_serializer = JsonSerializer.Create(_jsonSerializerSettings)
' open the _file as a stream
Using stream As Stream = File.OpenRead(_filename)
' set up the stream readers
Using textReader As TextReader = New StreamReader(stream)
Using jsonReader As JsonReader = New JsonTextReader(textReader)
' move to the start of the array and read the stream
While Await jsonReader.ReadAsync().ConfigureAwait(False)
Await ProcessAsync(jsonReader).ConfigureAwait(False)
End While
End Using
End Using
End Using
遍历 JSON 图的代码
// walk the stream
private async Task ProcessAsync(JsonReader jsonReader)
{
// make sure we are looking at the correct json element
if (jsonReader.TokenType != JsonToken.PropertyName)
return;
// process properties for data that we want
if (jsonReader.GetString() == "categoryTreeVersion")
{
// get the value
await jsonReader.ReadAsync().ConfigureAwait(false);
string? version = jsonReader.GetString();
Console.WriteLine($"Version: {version ?? "no value"}");
}
else if (jsonReader.GetString() == "categoryTreeId")
{
// get the value
await jsonReader.ReadAsync().ConfigureAwait(false);
string? id = jsonReader.GetString();
Console.WriteLine($"Id: {id ?? "no value"}");
}
else if (jsonReader.GetString() == "categoryAspects")
{
// move to the start of the array
await jsonReader.ReadAsync().ConfigureAwait(false);
// step through each complete object in the Json Array
while (await jsonReader.ReadAsync().ConfigureAwait(false) &&
jsonReader.TokenType != JsonToken.EndArray)
ProcessCollection(jsonReader);
}
}
' walk the stream
Private Async Function ProcessAsync(jsonReader As JsonReader) As Task
If jsonReader.TokenType <> JsonToken.PropertyName Then
Return
End If
' process properties for data that we want
If jsonReader.GetString() = "categoryTreeVersion" Then
' get the value
Await jsonReader.ReadAsync().ConfigureAwait(False)
Dim version = jsonReader.GetString()
Console.WriteLine($"Version: {If(version, "no value")}")
End If
If jsonReader.GetString() = "categoryTreeId" Then
' get the value
Await jsonReader.ReadAsync().ConfigureAwait(False)
Dim Id = jsonReader.GetString()
Console.WriteLine($"Id: {If(Id, "no value")}")
End If
If jsonReader.GetString() = "categoryAspects" Then
' move to the start of the array
Await jsonReader.ReadAsync().ConfigureAwait(False)
'step through each complete object in the Json Array
While Await jsonReader.ReadAsync().ConfigureAwait(False) AndAlso
jsonReader.TokenType <> JsonToken.EndArray
ProcessCollection(jsonReader)
End While
End If
End Function
注意:上述处理代码不限于根节点,它将搜索 JSON 图并仅处理那些已识别的节点。
要存储每个对象,我们使用与前面简单集合示例相同的代码
private void ProcessCollection(JsonReader jsonReader)
{
CategoryAspect? categoryAspect =
_serializer!.Deserialize<CategoryAspect>(jsonReader);
// process and store the data model
_count++;
}
Private Sub ProcessCollection(jsonReader As JsonReader)
Dim categoryAspect = _serializer.Deserialize(Of CategoryAspect)(jsonReader)
' process and store the data model
_count += 1
End Sub
注意:要查看代码运行,请参阅 prototype \ local \ ComplexData \ NewtonsoftEbay
VB/C# 项目。
使用压缩的 JSON 数据文件
Zip 文件非常适合压缩基于文本的 JSON 文件,尤其是在处理非常大的文件时。我们可以流式读取 zip 文件和其中的压缩 JSON 文件。
读取数据的代码与上面相同,我们只需要添加代码来打开 zip 文件并读取条目。这里,我假设每个文件都是正确的类型
using ZipArchive zipArchive = new(File.OpenRead(_filename));
foreach (ZipArchiveEntry zipArchiveEntry in zipArchive.Entries)
{
Console.WriteLine($"Processing: {_file} > {zipArchiveEntry.FullName}");
await using Stream stream = zipArchiveEntry.Open();
// set up the stream readers
using TextReader textReader = new StreamReader(stream);
using JsonReader jsonReader = new JsonTextReader(textReader);
// do work here...
}
Using zipArchive = New ZipArchive(File.OpenRead(_fileHelper.Resolve(_file)))
For Each zipArchiveEntry As ZipArchiveEntry In zipArchive.Entries
Console.WriteLine($"Processing: {_file} > {zipArchiveEntry.FullName}")
Using stream As Stream = zipArchiveEntry.Open()
' set up the stream readers
Using textReader As TextReader = New StreamReader(stream)
Using jsonReader = New JsonTextReader(textReader)
' do work here
End Using
End Using
End Using
Next
End Using
如果您需要查找特定文件,只需检查每个 ZipArchive
条目的名称
using ZipArchive zipArchive = new(File.OpenRead(_filename));
foreach (ZipArchiveEntry zipArchiveEntry in zipArchive.Entries)
{
if (zipArchiveEntry.Name == "file_name_goes_here")
{
Console.WriteLine($"Processing: {_file} > {zipArchiveEntry.FullName}");
// set up the stream readers
await using Stream stream = zipArchiveEntry.Open();
using TextReader textReader = new StreamReader(stream);
using JsonReader jsonReader = new JsonTextReader(textReader);
// do work here...
}
}
Using zipArchive = New ZipArchive(File.OpenRead(_fileHelper.Resolve(_file)))
For Each zipArchiveEntry As ZipArchiveEntry In zipArchive.Entries
if zipArchiveEntry.Name == "file_name_goes_here" then
Console.WriteLine($"Processing: {_file} > {zipArchiveEntry.FullName}")
Using stream As Stream = zipArchiveEntry.Open()
' set up the stream readers
Using textReader As TextReader = New StreamReader(stream)
Using jsonReader = New JsonTextReader(textReader)
' do work here
End Using
End Using
End Using
End If
Next
End Using
文件流示例
将所有内容组合在一起,我们最终得到如下所示的内容
using Common.Helpers;
using Ebay.NewtonSoft.Json.Models;
using Newtonsoft.Json;
internal class Program
{
#region Fields
private readonly IFilePathHelper _fileHelper =
new FilePathHelper("Resources");
private JsonSerializer? _serializer;
private JsonSerializerSettings? _jsonSerializerSettings;
private int _count;
private readonly string _file = "EBAY_US FetchItemAspectsResponse.json";
#endregion
#region Methods
private static async Task Main()
=> await new Program().Execute().ConfigureAwait(false);
private async Task Execute()
{
Console.WriteLine($"Reading {this._file}");
this._jsonSerializerSettings = new();
this._serializer = JsonSerializer.Create(this._jsonSerializerSettings);
using ZipArchive zipArchive =
new(File.OpenRead(this._fileHelper.Resolve(this._file)));
foreach (ZipArchiveEntry zipArchiveEntry in zipArchive.Entries)
{
Console.WriteLine
($"Processing: {this._file} > {zipArchiveEntry.FullName}");
await using Stream stream = zipArchiveEntry.Open();
// set up the stream readers
using TextReader textReader = new StreamReader(stream);
using JsonReader jsonReader = new JsonTextReader(textReader);
await using (stream.ConfigureAwait(false))
{
// move to the start of the array and read the stream
while (await jsonReader.ReadAsync().ConfigureAwait(false))
await this.ProcessAsync(jsonReader).ConfigureAwait(false);
}
}
// report results
Console.WriteLine($"CategoryAspects: {this._count:N0}");
Console.WriteLine("Finished");
}
private async Task ProcessAsync(JsonReader jsonReader)
{
if (jsonReader.TokenType != JsonToken.PropertyName) return;
// process properties for data that we want
if (jsonReader.GetString() == "categoryTreeVersion")
{
// get the value
await jsonReader.ReadAsync().ConfigureAwait(false);
string? version = jsonReader.GetString();
Console.WriteLine($"Version: {version ?? "no value"}");
}
else if (jsonReader.GetString() == "categoryTreeId")
{
// get the value
await jsonReader.ReadAsync().ConfigureAwait(false);
string? id = jsonReader.GetString();
Console.WriteLine($"Id: {id ?? "no value"}");
}
else if (jsonReader.GetString() == "categoryAspects")
{
// move to the start of the array
await jsonReader.ReadAsync().ConfigureAwait(false);
// step through each complete object in the Json Array
while (await jsonReader.ReadAsync().ConfigureAwait(false) &&
jsonReader.TokenType != JsonToken.EndArray)
this.ProcessCollection(jsonReader);
}
}
private void ProcessCollection(JsonReader jsonReader)
{
CategoryAspect? categoryAspect =
this._serializer!.Deserialize<categoryaspect>(jsonReader);
// process and store the data model
this._count++;
}
#endregion
}
Imports System.IO
Imports System.IO.Compression
Imports Common.Helpers
Imports Newtonsoft.Json
Imports NewtonSoftZippedEbay.Models
' Mock_Json_Files
Module Program
#Region "Fields"
Private ReadOnly _fileHelper As FilePathHelper =
New FilePathHelper("Resources")
Private _serializer As JsonSerializer
Private _jsonSerializerSettings As JsonSerializerSettings
Private _count As Integer
Private ReadOnly _file As String = "EBay CategoryAspects.zip"
#End Region
#Region "Methods"
Sub Main(args As String())
Console.WriteLine($"Reading {_file}")
ExecuteAsync(args).GetAwaiter.GetResult()
Console.WriteLine("Finished")
End Sub
Private Async Function ExecuteAsync(args As String()) As Task
_jsonSerializerSettings = New JsonSerializerSettings()
_serializer = JsonSerializer.Create(_jsonSerializerSettings)
Using zipArchive = New ZipArchive(File.OpenRead(_fileHelper.Resolve(_file)))
For Each zipArchiveEntry As ZipArchiveEntry In zipArchive.Entries
Console.WriteLine($"Processing: {_file} > {zipArchiveEntry.FullName}")
Using stream As Stream = zipArchiveEntry.Open()
' set up the stream readers
Using textReader As TextReader = New StreamReader(stream)
Using jsonReader = New JsonTextReader(textReader)
' move to the start of the array and read the stream
While Await jsonReader.ReadAsync().ConfigureAwait(False)
Await ProcessAsync(jsonReader).ConfigureAwait(False)
End While
End Using
End Using
End Using
Next
End Using
' report results
Console.WriteLine($"CategoryAspects: {_count:N0}")
End Function
Private Async Function ProcessAsync(jsonReader As JsonReader) As Task
If jsonReader.TokenType <> JsonToken.PropertyName Then
Return
End If
' process properties for data that we want
If jsonReader.GetString() = "categoryTreeVersion" Then
' get the value
Await jsonReader.ReadAsync().ConfigureAwait(False)
Dim version = jsonReader.GetString()
Console.WriteLine($"Version: {If(version, "no value")}")
End If
If jsonReader.GetString() = "categoryTreeId" Then
' get the value
Await jsonReader.ReadAsync().ConfigureAwait(False)
Dim Id = jsonReader.GetString()
Console.WriteLine($"Id: {If(Id, "no value")}")
End If
If jsonReader.GetString() = "categoryAspects" Then
' move to the start of the array
Await jsonReader.ReadAsync().ConfigureAwait(False)
'step through each complete object in the Json Array
While Await jsonReader.ReadAsync().ConfigureAwait(False) AndAlso
jsonReader.TokenType <> JsonToken.EndArray
ProcessCollection(jsonReader)
End While
End If
End Function
Private Sub ProcessCollection(jsonReader As JsonReader)
Dim categoryAspect = _serializer.Deserialize(Of CategoryAspect)(jsonReader)
' process and store the data model
_count += 1
End Sub
#End Region
End Module
注意:要查看代码运行,请参阅 prototype \ local \ ComplexZippedData \ NewtonSoftZippedEbay
VB/C# 项目。还有一个联系人版本和用于联系人和 Ebay 的 Web API 版本。
注意:在撰写本文时 (DotNet 6.0),如果您正在使用 Web API 流式传输,DotNet 将在您流式读取*之前*下载整个 Zip 文件。上述代码和项目示例会将整个 zip 文件加载到内存中。
如果您正在处理非常大的 zip 文件,您需要先流式传输到磁盘上的缓存文件,然后才能打开存档进行流式传输。这将使内存使用量保持在最低限度。如果您想缓存到文件,并且不确定如何将流下载到文件,请查看 GetDataFiles
项目中的 DowloadService
以了解如何操作。
示例项目
如果您想查看上述代码的实际运行情况,请下载项目并在 *Prototypes\Local* 或 *Prototypes\Remote* 文件夹中运行示例。每个文件夹中有四个示例
- 文件系统:
NewtonsoftContacts
和NewtonsoftZippedContacts
用于简单 JSON 集合对象反序列化,NewtonsoftEbay
和NewtonSoftZippedEbay
用于具有选择性反序列化的复杂 JSON 对象。 - Web API:
NewtonsoftContacts.Remote
和NewtonsoftZippedContacts.Remote
用于简单 Json 集合对象反序列化,NewtonsoftEbay.Remote
和NewtonSoftZippedEbay.Remote
用于具有选择性反序列化的复杂 JSON 对象。
注意:对于 Web API 示例项目,您需要在运行 .Remote
示例项目之前运行 JsonSamples.Host
Web 项目。
使用 System.Text.Json 进行流式传输
开箱即用,System.Text.Json
支持流式传输。它看起来像这样
using System.Text.Json;
// open the _file as a stream
await using FileStream stream = File.OpenRead(filename);
// Deserialize the stream
List<Contact> contacts = await JsonSerializer.DeserializeAsync<List<Contact>>(stream);
Imports System.Text.Json
Using stream as FileStream = File.OpenRead(filename)
Dim contacts = JsonSerializer.DeserializeAsync(Of List(Of Contact))(stream)
缺点是这需要将整个文件加载到内存中才能进行反序列化。这与执行以下操作相同
using System.Text.Json;
// open ald load the file into memory
string rawJon = await File.ReadAllTextAsync(filename);
List<Contact> contacts = JsonSerializer.Deserialize<List<Contact>>(rawJon);
Imports System.Text.Json
Dim rawJson As String = Await File.ReadAllTextAsync(filename)
Dim contacts = JsonSerializer.Deserialize(Of List(Of Contact))(rawJson)
对于简单 JSON 集合对象,如上面的示例,System.Text.Json
确实支持按对象反序列化,从而避免将整个文件加载到内存中。这是一个例子
using System.Text.Json;
// open the _file as a stream
await using FileStream stream = File.OpenRead(filename);
await foreach
(var Contact in JsonSerializer.DeserializeAsyncEnumerable<Contact>(stream))
{
// do work here...
}
Imports System.Text.Json
_options = New JsonSerializerOptions()
' open the _file as a stream
Using stream As Stream = File.OpenRead(_fileHelper.Resolve(_file))
Console.WriteLine($"Processing: {_file}")
' VB version of C#'s "await foreach(..)"
Dim iterator = JsonSerializer.DeserializeAsyncEnumerable(Of Contact) _
(stream, _options).GetAsyncEnumerator()
Do While Await iterator.MoveNextAsync()
Dim item = iterator.Current
Process(item)
Loop
Await iterator.DisposeAsync()
End Using
注意:从上面的示例可以看出,VB 没有 C# 的 await foreach
异步循环,因此我们需要手动遍历异步集合
Dim iterator = <method_goes_here>.GetAsyncEnumerator()
Do While Await iterator.MoveNextAsync()
Dim item = iterator.Current
' porcess item here
Loop
Await iterator.DisposeAsync()
上述示例的缺点是,它不适用于具有选择性反序列化的复杂 JSON 对象。开箱即用,没有支持。我们必须自己使用自定义 Stream Reader 编写它。我已经创建了一个,接下来我们将探讨它。
文件流示例
将所有内容组合在一起,我们最终得到如下所示的内容
using Common.Helpers;
using System.Text.Json;
using Contacts.System.Text.Json.Models;
internal class Program
{
#region Fields
private readonly IFilePathHelper _fileHelper =
new FilePathHelper("Resources");
private JsonSerializerOptions? _jsonSerializerOptions;
private int _count;
private readonly string _file = "Mock_Contacts1.json";
#endregion
#region Methods
private static async Task Main()
=> await new Program().Execute().ConfigureAwait(false);
private async Task Execute()
{
Console.WriteLine($"Reading {this._file}");
this._jsonSerializerOptions = new();
// open the _file as a stream
await using FileStream stream =
File.OpenRead(this._fileHelper.Resolve(this._file));
Console.WriteLine($"Processing: {this._file}");
// deserialize the stream an object at a time...
await foreach (Contact? item in
JsonSerializer.DeserializeAsyncEnumerable<Contact>
(stream, this._jsonSerializerOptions))
Process(item);
// report results
Console.WriteLine($"Contacts: {this._count:N0}");
Console.WriteLine("Finished");
}
private void Process(Contact? item)
{
// process and store the data model
this._count++;
}
#endregion
}
Imports System.IO
Imports System.Text.Json
Imports System.Text.Json.Stream
Imports Common.Helpers
Imports SystemTextJsonContact.Models
Module Program
#Region "Fields"
Private ReadOnly _fileHelper As FilePathHelper =
New FilePathHelper("Resources")
Private _options As JsonSerializerOptions
Private _count As Integer
Private ReadOnly _file As String = "Mock_Contacts1.json"
#End Region
#Region "Methods"
Sub Main(args As String())
Console.WriteLine($"Reading {_file}")
MainAsync(args).GetAwaiter.GetResult()
Console.WriteLine("Finished")
End Sub
Private Async Function MainAsync(args As String()) As Task
_options = New JsonSerializerOptions()
' open the _file as a stream
Using stream As Stream = File.OpenRead(_fileHelper.Resolve(_file))
Console.WriteLine($"Processing: {_file}")
' VB version of C#'s "await foreach(..)"
Dim iterator = JsonSerializer.DeserializeAsyncEnumerable(Of Contact) _
(stream, _options).GetAsyncEnumerator()
While Await iterator.MoveNextAsync()
Dim item = iterator.Current
Process(item)
End While
Await iterator.DisposeAsync()
End Using
' report results
Console.WriteLine($"Contacts: {_count:N0}")
End Function
Private Sub Process(item As Contact)
' process and store the data model
_count += 1
End Sub
#End Region
End Module
注意:要查看代码运行,请参阅 prototype \ local \ SimpleData \ SystemTextJsonContact
VB/C# 项目。对于 Web API 版本,请参阅 prototype \ remote \ SimpleData \ SystemTextJsonContact
VB/C# 项目
第 2 部分:自定义 Utf8JsonAsyncStreamReader
目标是编写一个能够像 NewtonSoft.Json.JsonTextReader
一样工作的 Stream Reader,并且代码更改最少。在下一节“编写自定义 Stream Reader”中,我将详细介绍如何实现它。
如何使用新的 Utf8JsonAsyncStreamReader
System.Text.Json
是 Newtonsoft.Json
的重写,因此好处是解决方案只有一行代码——TextReader
和 JsonTextReader
都合并到一个类中。这将适用于任何流类型。所以使用它就像
using Utf8JsonAsyncStreamReader jsonReader = new Utf8JsonAsyncStreamReader(stream);
// do work here...
Using jsonReader As Utf8JsonAsyncStreamReader = New Utf8JsonAsyncStreamReader(stream)
' do work here...
End Using
注意:Utf8JsonAsyncStreamReader
是 TextReader
和 JsonTextReader
的直接替代品,因此与上面的 NewtonSoft
示例代码完全相同。
编写自定义流读取器
注意:我不会在这里倾倒所有代码,只倾倒重要的部分。自定义 Stream Readers 的完整代码可以在项目 libaries \ System.Text.Json \ System.Text.Json.Stream
中找到。所有代码都经过充分注释,解释了其工作原理。
我对这个主题进行了大量研究,因为如果别人已经找到了解决方案,我不想做这项工作。
有一个是*同步*的:mtosh(原始解决方案)- StackOverflow,然后是evil-dr-nick - Github。注意:我在可下载代码中包含了更新版本,修复了一些小问题并对代码进行了现代化。
我们需要一个异步解决方案。目前还没有……直到现在!
经过几次尝试,我使用 System.IO.Pipelines API 提出了以下解决方案。为什么不像 mtosh 的上述同步解决方案那样使用 Span<T>
或 Memory<t>
呢?Dot Net 6.0 PipeReader
类仅支持 ReadAtLeastAsync
,不能保证使用 ReadOnlySequenceSegment<T>
所需的精确字节数。
使用 PipeReader
处理流的好处是它管理流的处理并返回一个 ReadOnlySequence<t> 结构。这使我们能够快速且原始地访问字节。
// move the start of the buffer past what has already been consumed
if (this._bytesConsumed > 0) this._reader.AdvanceTo
(this._buffer.GetPosition(this._bytesConsumed));
// top up the buffer stream
ReadResult readResult = await this._reader
.ReadAtLeastAsync(this._bufferSize, cancellationToken)
.ConfigureAwait(false);
// reset to new stream buffer segment
this._bytesConsumed = 0;
this._buffer = readResult.Buffer;
this._endOfStream = readResult.IsCompleted;
// check for any issues
if (this._buffer.Length - this._bytesConsumed > 0 &&
!this.JsonReader(this._endOfStream))
throw new Exception("Invalid Json or incomplete token or buffer undersized");
此代码位于方法 ValueTask<bool> ReadAsync
中。当我们准备反序列化已识别的对象时,我们将字节缓冲到 MemoryStream
中。我们在 ReadAsync
方法中使用标志 '_isBuffering
' 来管理缓冲
// store stream buffer/chunk if we are wanting to Deserialize the Json object
if (this._isBuffering)
{
this.WriteToBufferStream();
// reset the buffer start tracking
this._bufferingStartIndex = 0;
}
缓冲区的写入是通过 PipeWriter
完成的。经过一些测试,由于装箱要求,手动写入比使用内置的 Write
更快。它看起来像这样
this._writer!.Write(this._buffer.Slice(this._bufferingStartIndex,
this._bytesConsumed - this._bufferingStartIndex).ToArray());
我没有继承 PipeWriter
并编写我自己的自定义 Write
方法,因为它只在一个地方需要,我只是内联执行它(没有装箱)
private void WriteToBufferStream()
{
// get number of bytes to transfer
int bytes = this._bytesConsumed - this._bufferingStartIndex;
// store
this._buffer.Slice(this._bufferingStartIndex, bytes).CopyTo
(this._writer!.GetSpan(bytes));
// manually advance buffer pointer
this._writer.Advance(bytes);
}
对于 DeserializeAsync
方法,我们需要遍历流以找到对象的末尾。如果我们不这样做,JsonSerializer.DeserializeAsync
方法将抛出错误。遍历流只是监视图的深度,直到我们找到对象或数组的末尾
// walk the json object tree until we have the complete json object
while (!cancellationToken.IsCancellationRequested)
{
if (this.TokenType is JsonTokenType.StartObject or JsonTokenType.StartArray)
depth++;
else if (this.TokenType is JsonTokenType.EndObject or JsonTokenType.EndArray)
depth--;
if (depth == 0)
break;
await this.ReadAsync(cancellationToken).ConfigureAwait(false);
}
一旦我们拥有完整的对象图,我们就可以进行清理
// remaining bytes to be buffered (overflow)
this.WriteToBufferStream();
// flush all writes
await this._writer!.CompleteAsync().ConfigureAwait(false);
// operation cancelled remotely
if (cancellationToken.IsCancellationRequested)
return false;
// Point to beginning of the memory stream
stream.Seek(0, SeekOrigin.Begin);
// success
return true;
我们将流指针移回流的开始位置,准备使用 JsonSerializer.DeserializeAsync
方法进行反序列化
// fill temp stream with json object
if (!await this.GetJsonObjectAsync(stream, cancellationToken).ConfigureAwait(false))
return default;
// deserialize object from temp stream
TResult? result = await JsonSerializer
.DeserializeAsync<TResult>(stream, cancellationToken: cancellationToken)
.ConfigureAwait(false);
// we are done buffering
this._isBuffering = false;
查看示例以了解代码及其工作原理。
第 3 部分:简化大型 JSON 对象流工作的库
下一节是关于使用本文和可下载代码中捆绑的库。这些库基于我曾经从事的项目以及从 Newtonsoft.Json
迁移到 System.Text.Json
的需求。这些库不是处理流所必需的,但将有助于封装过程以减少重复代码。
关键设计目标
- 可在
Newtonsoft.Json
和System.Text.Json
之间互换;文件系统
和Web API
;Json 对象
和zipped JSON 对象
;具有几乎相同的接口,因此在实现之间切换是一个无缝过程 - 使用
文件系统
和Web Api
流 - 使用单个或多个原始 JSON 文件或任意大小的压缩原始 Json 文件
- 抽象出所有打开和处理 JSON 对象的实现 - 只需识别和处理代码
- 异步操作,包括快速失败错误处理
- 在流中途取消支持 -
CancellationToken
- 最小内存占用 - 在对象反序列化时处理它们,无需将所有对象保留在内存中
- 高性能 - 尽可能接近原始性能
DI
/IOC
支持 - 不依赖于任何IOC
容器系统ILogger
支持 - 不专门绑定到任何特定的记录器- 自定义数据缓冲大小配置 -
Newtonsoft
使用的默认缓冲区大小为 1K(1,024 字节),System.Text.Json
为 16K(16,384 字节) - 可测试性和基准测试
如何使用:Newtonsoft.Json - 简单 Json 集合对象
对于简单 Json 集合对象,我们实现 JsonFileStreamObjectDeserializer<TConfiguration>
基类
public class ContactFileStreamDeserializer
: JsonFileStreamObjectDeserializer<IContactFilesConfiguration>
{
// code goes here
}
Public Class ContactFileStreamDeserializer
Inherits JsonFileStreamObjectDeserializer(Of IFilesConfiguration)
' code goes here...
End Class
然后我们实现 ProcessAsync
方法
protected override async Task ProcessAsync
(JsonReader jsonReader, CancellationToken cancellationToken)
{
if (this.BatchSize > 1)
await this.DeserializeAsync<Contact>
(jsonReader, this.BatchProcessAsync, cancellationToken)
.ConfigureAwait(false);
else
await this.DeserializeAsync<Contact>
(jsonReader, this.ItemProcessAsync, cancellationToken)
.ConfigureAwait(false);
}
Protected Overrides Async Function ProcessAsync
(jsonReader As JsonReader, cancellationToken As CancellationToken) As Task
If BatchSize > 1 Then
Await DeserializeAsync(Of Contact)
(jsonReader, AddressOf BatchProcessAsync, cancellationToken)
Else
Await DeserializeAsync(Of Contact)
(jsonReader, AddressOf ItemProcessAsync, cancellationToken)
End If
End Function
如您所见,批处理和单个对象支持都内置其中。有关如何配置的更多信息,请参阅下面的属性表。
上述 ProcessAsync
方法代码适用于原始文件和压缩文件。实现基于继承的基类
- 原始 Json 文件:
JsonFileStreamObjectDeserializer<TConfiguration>
- 压缩 Json 文件:
JsonZipFileStreamPropertyDeserializer<TZipConfiguration, TConfiguration>
这同样适用于 Web API
- 原始 Json 文件:
JsonHttpStreamPropertyDeserializer<TConfiguration>
- 压缩 Json 文件:
JsonZipHttpStreamObjectDeserializer<TZipConfiguration, TConfiguration>
使用上述已实现的类
IFilePathHelper fileHelper = new FilePathHelper(DataFolder)
IContactFilesConfiguration config = new ContactFilesConfiguration();
CancellationTokenSource cts = new();
var deserializer = new ContactFileStreamDeserializer(fileHelper, config)
{
FileId = "MOCK1",
FileAction = DeserializeActionType.Single,
FailIfFileNotFound = true,
CancellationTokenSource = cts
};
await deserializer.ProcessAsync().ConfigureAwait(false);
Dim fileHelper = New FilePathHelper(DataFolder)
Dim config = New ContactFilesConfiguration()
Dim cts As New CancellationTokenSource()
Dim deserializer = New ContactFileStreamDeserializer(fileHelper, config) With
{
.FileId = "MOCK1",
.FileAction = DeserializeActionType.Single,
.FailIfFileNotFound = True,
.CancellationTokenSource = cts
}
await deserializer.ProcessAsync().ConfigureAwait(False)
有一个配置文件,其中包含数据的位置。此配置类可以包含项目中所有原始 JSON 文件的名称,通过键 ID 访问,或者只包含需要处理的文件
public class ContactFilesConfiguration : IContactFilesConfiguration
{
#region Constructors
public ContactFilesConfiguration()
{
this.Paths = new Dictionary<string, string>
{
["MOCK1"] = "Mock_Contacts1.json",
["MOCK2"] = "Mock_Contacts2.json",
["MOCK3"] = "Mock_Contacts3.json",
};
}
#endregion
#region Properties
public IDictionary<string, string> Paths { get; }
#endregion
}
Public Class ContactFilesConfiguration
Implements IContactFilesConfiguration
#Region "Constructors"
Public Sub New()
Paths = New Dictionary(Of String, String) From {
{"MOCK1", "Mock_Contacts1.json"},
{"MOCK2", "Mock_Contacts2.json"},
{"MOCK3", "Mock_Contacts3.json"}
}
End Sub
#End Region
#Region "Properties"
Public ReadOnly Property Paths As IDictionary(Of String, String) _
Implements Configuration.IDataConfiguration.Paths
#End Region
End Class
如果您有多个相同类型的文件要处理,则可以将 FileAction
属性设置为 DeserializeActionType.Multiple
,基类将自动遍历配置文件中的文件。
处理压缩的原始 JSON 文件也是如此。我们有一个单独的配置文件,类似于上面的文件
public class ContactZipFilesConfiguration : IContactZipFilesConfiguration
{
#region Constructors
public ContactZipFilesConfiguration()
{
this.Paths = new Dictionary<string, string>
{
["MOCK_ZIP"] = "Mock_Json_Files.zip",
};
}
#endregion
#region Properties
public IDictionary<string, string> Paths { get; }
#endregion
}
Public Class ContactZipFilesConfiguration
Implements IContactZipFilesConfiguration
#Region "Constructors"
Public Sub New()
Paths = New Dictionary(Of String, String) From {
{"MOCK_ZIP", "Mock_Json_Files.zip"}
}
End Sub
#End Region
#Region "Properties"
Public ReadOnly Property Paths As IDictionary(Of String, String) _
Implements Configuration.IDataConfiguration.Paths
#End Region
End Class
注意:要查看代码运行,请参阅 applications \ local \ SimpleData \ NewtonsoftContacts
和 applications \ local \ SimpleZippedData \ NewtonsoftZippedContacts
和 applications \ remote \ SimpleData \ NewtonsoftContacts
和 applications \ remote \ SimpleZippedData \ NewtonsoftZippedContacts
VB/C# 项目,用于非依赖注入。要与依赖注入一起使用,请参阅 applications \ local \ SimpleData \ NewtonsoftContactsDI
和 applications \ local \ SimpleZippedData \ NewtonsoftZippedContactsDI
和 applications \ remote \ SimpleData \ NewtonsoftContactsDI
和 applications \ remote \ SimpleZippedData \ NewtonsoftZippedContactsDI
VB/C# 项目。非 DI 和 DI 项目之间的通用代码位于 Shared
子文件夹中。您还可以在 UnitTest 和 Benchmark VB/C# 项目中查看正在使用的代码。
如何使用:Newtonsoft.Json - 复杂 JSON 对象
对于复杂的 Json 对象,我们需要实现 JsonFileStreamPropertyDeserializer<TConfiguration>
基类
public class EbayCategoryAspectFileStreamDeserializer
: JsonFileStreamPropertyDeserializer<IEbayCategoryAspectFilesConfiguration>
{
// code goes here
}
Public Class EbayCategoryAspectFileStreamDeserializer
Inherits JsonFileStreamObjectDeserializer(Of IEbayCategoryAspectFilesConfiguration)
' code goes here
End Class
然后我们实现 ProcessAsync
方法
protected override async Task ProcessAsync
(JsonReader jsonReader, CancellationToken cancellationToken)
{
// process properties for data that we want
switch (jsonReader.GetString())
{
case "categoryAspects":
if (BatchSize > 1)
await DeserializeAsync<CategoryAspect>
(jsonReader, BatchProcessAsync, cancellationToken)
.ConfigureAwait(false);
else
await DeserializeAsync<CategoryAspect>
(jsonReader, ItemProcessAsync, cancellationToken)
.ConfigureAwait(false);
break;
case "categoryTreeVersion":
{
// get the value
await jsonReader.ReadAsync(cancellationToken).ConfigureAwait(false);
string? version = jsonReader.GetString();
Logger?.Emit(LogLevel.Information, $"Version: {version ?? "no value"}");
break;
}
case "categoryTreeId":
{
// get the value
await jsonReader.ReadAsync(cancellationToken).ConfigureAwait(false);
string? id = jsonReader.GetString();
Logger?.Emit(LogLevel.Information, $"Id: {id ?? "no value"}");
break;
}
}
}
Protected Overrides Async Function ProcessAsync
(jsonReader As JsonReader, cancellationToken As CancellationToken) As Task
' process properties for data that we want
Select Case jsonReader.GetString()
Case "categoryAspects"
Await jsonReader.ReadAsync().ConfigureAwait(False)
If BatchSize > 1 Then
Await DeserializeAsync(Of CategoryAspect)(jsonReader,
AddressOf BatchProcessAsync, cancellationToken).
ConfigureAwait(False)
Else
Await DeserializeAsync(Of CategoryAspect)(jsonReader,
AddressOf ItemProcessAsync, cancellationToken).
ConfigureAwait(False)
End If
Case "categoryTreeVersion"
' get the value
Await jsonReader.ReadAsync().ConfigureAwait(False)
Dim version = jsonReader.GetString()
_logger.Emit(LogLevel.Information, $"Version: {If(version, "no value")}")
Case "categoryTreeId"
' get the value
Await jsonReader.ReadAsync().ConfigureAwait(False)
Dim Id = jsonReader.GetString()
_logger.Emit(LogLevel.Information, $"Id: {If(Id, "no value")}")
End Select
End Function
使用上述已实现的类
IFilePathHelper fileHelper = new FilePathHelper(DataFolder)
IEbayCategoryAspectFilesConfiguration config =
new EbayCategoryAspectFilesConfiguration();
CancellationTokenSource cts = new();
var deserializer = new EbayCategoryAspectFileStreamDeserializer(fileHelper, config)
{
MarketplaceId = "EBAY_US",
FileAction = DeserializeActionType.Single,
FailIfFileNotFound = true,
CancellationTokenSource = cts
};
await deserializer.ProcessAsync().ConfigureAwait(false);
Dim fileHelper = New FilePathHelper(DataFolder)
Dim config = New ContactFilesConfiguration()
Dim cts As New CancellationTokenSource()
Dim deserializer =
New EbayCategoryAspectFileStreamDeserializer(fileHelper, config) With
{
.MarketplaceId = "EBAY_US",
.FileAction = DeserializeActionType.Single,
.FailIfFileNotFound = True,
.CancellationTokenSource = cts
}
await deserializer.ProcessAsync().ConfigureAwait(False)
注意:要查看代码运行,请参阅 applications \ local \ SimpleData \ NewtonsoftEbay
和 applications \ local \ SimpleZippedData \ NewtonsoftZippedEbay
和 applications \ remote \ SimpleData \ NewtonsoftEbay
和 applications \ remote \ SimpleZippedData \ NewtonsoftZippedEbay
VB/C# 项目,用于非依赖注入。要与依赖注入一起使用,请参阅 applications \ local \ SimpleData \ NewtonsoftEbayDI
和 applications \ local \ SimpleZippedData \ NewtonsoftZippedEbayDI
和 applications \ remote \ SimpleData \ NewtonsoftEbayDI
和 applications \ remote \ SimpleZippedData \ NewtonsoftZippedEbayDI
VB/C# 项目。非 DI 和 DI 项目之间的通用代码位于 Shared
子文件夹中。您还可以在 UnitTest 和 Benchmark VB/C# 项目中查看正在使用的代码。
如何使用:System.Text.Json - 简单 Json 集合对象
对于简单 JSON 集合对象,System.Text.Json
有一个用于枚举 JSON 对象流集合的新反序列化方法,称为 DeserializeAsyncEnumerable
。因此,基类实现公开了一个 Stream
对象而不是 StreamReader
对象。JsonFileStreamObjectDeserializer<TConfiguration>
基类的实现是相同的
public class ContactFileStreamDeserializer
: JsonFileStreamObjectDeserializer<IContactFilesConfiguration>
{
// code goes here
}
Public Class ContactFileStreamDeserializer
Inherits JsonFileStreamObjectDeserializer(Of IFilesConfiguration)
' code goes here...
End Class
然而,我们如何实现 ProcessAsync
方法有所改变
protected override async Task ProcessAsync
(Stream stream, CancellationToken cancellationToken)
{
if (this.BatchSize > 1)
await this.DeserializeAsync<Contact>
(stream, this.BatchProcessAsync, cancellationToken)
.ConfigureAwait(false);
else
await this.DeserializeAsync<Contact>
(stream, this.ItemProcessAsync, cancellationToken)
.ConfigureAwait(false);
}
Protected Overrides Async Function ProcessAsync
(stream As Stream, cancellationToken As CancellationToken) As Task
If BatchSize > 1 Then
Await DeserializeAsync(Of Contact)(stream,
AddressOf BatchProcessAsync, cancellationToken).
ConfigureAwait(False)
Else
Await DeserializeAsync(Of Contact)(stream,
AddressOf ItemProcessAsync, cancellationToken).
ConfigureAwait(False)
End If
End Function
上述实现的用法与 Newtonsoft.Json
相同
IFilePathHelper fileHelper = new FilePathHelper(DataFolder)
IContactFilesConfiguration config = new ContactFilesConfiguration();
CancellationTokenSource cts = new();
var deserializer = new ContactFileStreamDeserializer(filePathHelper, config)
{
FileId = "MOCK1",
FileAction = DeserializeActionType.Single,
FailIfFileNotFound = true,
CancellationTokenSource = cancellationTokenSource
};
await deserializer.ProcessAsync().ConfigureAwait(false);
Dim fileHelper = New FilePathHelper(DataFolder)
Dim config = New ContactFilesConfiguration()
Dim cts As New CancellationTokenSource()
Dim deserializer = New ContactFileStreamDeserializer(fileHelper, config) With
{
.FileId = "MOCK1",
.FileAction = DeserializeActionType.Single,
.FailIfFileNotFound = True,
.CancellationTokenSource = cts
}
await deserializer.ProcessAsync().ConfigureAwait(False)
注意:要查看代码运行,请参阅 applications \ local \ SimpleData \ SystemTextJsonContacts
和 applications \ local \ SimpleZippedData \ SystemTextJsonZippedContacts
和 applications \ remote \ SimpleData \ SystemTextJsonContacts
和 applications \ remote \ SimpleZippedData \ SystemTextJsonZippedContacts
VB/C# 项目,用于非依赖注入。要与依赖注入一起使用,请参阅 applications \ local \ SimpleData \ SystemTextJsonContactsDI
和 applications \ local \ SimpleZippedData \ SystemTextJsonZippedContactsDI
和 applications \ remote \ SimpleData \ SystemTextJsonContactsDI
和 applications \ remote \ SimpleZippedData \ SystemTextJsonZippedContactsDI
VB/C# 项目。非 DI 和 DI 项目之间的通用代码位于 *Shared* 子文件夹中。您还可以在 UnitTest
和 Benchmark
VB/C# 项目中查看正在使用的代码。
如何使用:System.Text.Json - 复杂 JSON 对象
对于复杂的 JSON 对象,我们使用自定义 Utf8JsonAsyncStreamReader
类进行流读取和处理。JsonFileStreamPropertyDeserializer<TConfiguration>
基类的实现
public class EbayCategoryAspectFileStreamDeserializer
: JsonFileStreamPropertyDeserializer<IEbayCategoryAspectFilesConfiguration>
{
// code goes here
}
Public Class EbayCategoryAspectFileStreamDeserializer
Inherits JsonFileStreamObjectDeserializer(Of IEbayCategoryAspectFilesConfiguration)
' code goes here...
End Class
然后我们实现 ProcessAsync
方法
protected override async Task ProcessAsync
(Utf8JsonAsyncStreamReader jsonReader, CancellationToken cancellationToken)
{
// process properties for data that we want
switch (jsonReader.GetString())
{
case "categoryAspects":
if (BatchSize > 1)
await DeserializeAsync<CategoryAspect>
(jsonReader, BatchProcessAsync, cancellationToken)
.ConfigureAwait(false);
else
await DeserializeAsync<CategoryAspect>
(jsonReader, ItemProcessAsync, cancellationToken)
.ConfigureAwait(false);
break;
case "categoryTreeVersion":
{
// get the value
await jsonReader.ReadAsync(cancellationToken).ConfigureAwait(false);
string? version = jsonReader.GetString();
Logger?.Emit(LogLevel.Information, $"Version: {version ?? "no value"}");
break;
}
case "categoryTreeId":
{
// get the value
await jsonReader.ReadAsync(cancellationToken).ConfigureAwait(false);
string? id = jsonReader.GetString();
Logger?.Emit(LogLevel.Information, $"Id: {id ?? "no value"}");
break;
}
}
}
Protected Overrides Async Function ProcessAsync
(jsonReader As Utf8JsonAsyncStreamReader,
cancellationToken As CancellationToken) As Task
' process properties for data that we want
Select Case jsonReader.GetString()
Case "categoryAspects"
Await jsonReader.ReadAsync().ConfigureAwait(False)
If BatchSize > 1 Then
Await DeserializeAsync(Of CategoryAspect)(jsonReader,
AddressOf BatchProcessAsync, cancellationToken).
ConfigureAwait(false)
Else
Await DeserializeAsync(Of CategoryAspect)(jsonReader,
AddressOf ItemProcessAsync, cancellationToken).
ConfigureAwait(false)
End If
Case "categoryTreeVersion"
' get the value
Await jsonReader.ReadAsync().ConfigureAwait(False)
Dim version = jsonReader.GetString()
_logger.Emit(LogLevel.Information, $"Version: {If(version, "no value")}")
Case "categoryTreeId"
' get the value
Await jsonReader.ReadAsync().ConfigureAwait(False)
Dim Id = jsonReader.GetString()
_logger.Emit(LogLevel.Information, $"Id: {If(Id, "no value")}")
End Select
End Function
要使用上述已实现的类,它与 NewtonSoft
相同
IFilePathHelper fileHelper = new FilePathHelper(DataFolder)
IEbayCategoryAspectFilesConfiguration config =
new EbayCategoryAspectFilesConfiguration();
CancellationTokenSource cts = new();
var deserializer = new EbayCategoryAspectFileStreamDeserializer(fileHelper, config)
{
MarketplaceId = "EBAY_US",
FileAction = DeserializeActionType.Single,
FailIfFileNotFound = true,
CancellationTokenSource = cts
};
await deserializer.ProcessAsync().ConfigureAwait(false);
Dim fileHelper = New FilePathHelper(DataFolder)
Dim config = New ContactFilesConfiguration()
Dim cts As New CancellationTokenSource()
Dim deserializer = New EbayCategoryAspectFileStreamDeserializer(fileHelper, config) With
{
.MarketplaceId = "EBAY_US",
.FileAction = DeserializeActionType.Single,
.FailIfFileNotFound = True,
.CancellationTokenSource = cts
}
await deserializer.ProcessAsync().ConfigureAwait(False)
注意:要查看代码运行,请参阅 applications \ local \ SimpleData \ SystemTextJsonEbay
和 applications \ local \ SimpleZippedData \ SystemTextJsonZippedEbay
和 applications \ remote \ SimpleData \ SystemTextJsonEbay
和 applications \ remote \ SimpleZippedData \ SystemTextJsonZippedEbay
VB/C# 项目,用于非依赖注入。要与依赖注入一起使用,请参阅 applications \ local \ SimpleData \ SystemTextJsonEbayDI
和 applications \ local \ SimpleZippedData \ SystemTextJsonZippedEbayDI
和 applications \ remote \ SimpleData \ SystemTextJsonEbayDI
和 applications \ remote \ SimpleZippedData \ SystemTextJsonZippedEbayDI
VB/C# 项目。非 DI 和 DI 项目之间的通用代码位于 Shared
子文件夹中。您还可以在 UnitTest 和 Benchmark VB/C# 项目中查看正在使用的代码。
库实现
基类的设计旨在同时适用于 Newtonsoft.Json
和 System.Text.Json
,因此基类分为三个部分
- 通用基础实现:
Common.Json
>JsonStreamDeserializer
类 Newtonsoft.Json
基础实现:Common.NewtonSoft.Json
>JsonStreamPropertyDeserializer
和JsonZipStreamPropertyDeserializer
基础公共类。然后有用于文件系统
和Web API
实现的单独基类文件系统
- 简单:
JsonFileStreamPropertyDeserializer
,JsonZipFileStreamPropertyDeserializer
- 复杂:
JsonFileStreamObjectDeserializer
,JsonZipFileStreamObjectDeserializer
- 简单:
Web API
- 简单:
JsonHttpStreamPropertyDeserializer
,JsonZipHttpStreamPropertyDeserializer
- 复杂:
JsonHttpStreamObjectDeserializer
,JsonZipHttpStreamObjectDeserializer
- 简单:
System.Text.Json
基础实现:Common.System.Text.Json
>JsonStreamPropertyDeserializer
和JsonZipStreamPropertyDeserializer
和JsonStreamObjectDeserializer
和JsonZipStreamObjectDeserializer
基础公共类。然后有用于文件系统
和Web API
实现的单独基类文件系统
- 简单:
JsonFileStreamPropertyDeserializer
,JsonZipFileStreamPropertyDeserializer
- 复杂:
JsonFileStreamObjectDeserializer
,JsonZipFileStreamObjectDeserializer
- 简单:
Web API
- 简单:
JsonHttpStreamPropertyDeserializer
,JsonZipHttpStreamPropertyDeserializer
- 复杂:
JsonHttpStreamObjectDeserializer
,JsonZipHttpStreamObjectDeserializer
- 简单:
System.Text.Json
有两个额外的 Object
公共基类。这是由于 Newtonsoft.Json
和 System.Text.Json
之间的差异。
如果您将库用于自己的用途,则所需的项目如下
Newtonsoft.Json
:Common.Json
和Common.NewtonSoft.Json
System.Text.Json
:Common.Json
和Common.System.Text.Json
(对于 VB 为Common.SystemText.Json
,因为编译器名称冲突)
配置属性
属性 | 描述 | 默认值 |
文件 ID | 配置文件中的查找键 | 未设置 |
ZipFileId | 配置 zip 文件中的查找键 | 未设置 |
文件操作 | 单个或多个配置文件条目 | Single |
ZipFileAction | 配置 zip 文件中的单个或多个查找键 | Single |
BatchSize | 一次处理的对象数量 | 1 |
缓冲区大小 | 从流中读取和处理的字节数 | 8,192 |
找不到文件时失败 | 找不到文件时静默失败或抛出异常 | true |
CancellationTokenSource | (可选) | 默认 |
JsonSerializerSettings | 仅限 Newtonsoft | 默认 |
JsonSerializerOptions | 仅限 System.Text.Json | 默认 |
我不会讨论这些类的代码,因为代码很多,而且本文已经太长了。所以,我建议查看代码。
我要指出的是我如何处理选择处理方法的决策逻辑。
我正在使用基于 FileAction
的键控字典
Common
基础
// Generate an ActionDelegate key
protected string GetActionKey(DeserializeActionType fileType)
=> $"{fileType}";
protected abstract Dictionary<string, Func<Task>> ActionDelegatesFactory();
// Execute an ActionDelegate based on configuration settings
protected virtual async ValueTask ExecuteActionDelegateAsync(string key)
{
Dictionary<string, Func<Task>> ActionDelegates = this.ActionDelegatesFactory();
if (!ActionDelegates.ContainsKey(key))
{
KeyNotFoundException exception = new($"The '{this.FileAction}'
Action was not found!");
this.Logger?.Emit(LogLevel.Error, "Invalid Action!", exception);
throw exception;
}
await ActionDelegates[key]().ConfigureAwait(false);
}
' Generate an ActionDelegate key
Protected Function GetActionKey(fileType As DeserializeActionType) As String
Return $"{fileType}"
End Function
' abstract members/variables are not allowed
Protected MustOverride Function ActionDelegatesFactory() _
As Dictionary(Of String, Func(Of Task))
' Execute an ActionDelegate based on configuration settings
Protected Overridable Async Function _
ExecuteActionDelegateAsync(key As String) As Task
Dim ActionDelegates = ActionDelegatesFactory()
If Not ActionDelegates.ContainsKey(key) Then
Dim exception = New KeyNotFoundException_
($"The '{Me.FileAction}' Action was not found!")
Me._logger.Emit(LogLevel.Error, "Invalid Action!", exception)
Throw exception
End If
Await ActionDelegates(key)().ConfigureAwait(False)
End Function
Common
.< lib >
文件/web API 基础
// Main Entry Point
public override async ValueTask ProcessAsync()
=> await this.ExecuteActionDelegateAsync(this.GetActionKey(this.FileAction))
.ConfigureAwait(false);
// Handler for parent class to process json fragments
protected abstract Task ProcessAsync(JsonReader jsonReader,
CancellationToken cancellationToken);
// Map configuration settings to Actions
protected override Dictionary<string, Func<Task>> ActionDelegatesFactory()
=> new()
{
[this.GetActionKey(DeserializeActionType.Single)] = ()
=> this.ProcessActionAsync
(this._configuration!.Paths[this.ConfigurationFileKey!],
this.ProcessAsync,
this.CancellationTokenSource?.Token ?? default),
[this.GetActionKey(DeserializeActionType.Multiple)] = ()
=> this.ProcessActionAsync
(this._configuration!, this.ProcessAsync,
this.CancellationTokenSource?.Token ?? default),
};
' Main Entry Point
Public Overrides Async Function ProcessAsync() As Task
Await Me.ExecuteActionDelegateAsync_
(Me.GetActionKey(Me.FileAction)).ConfigureAwait(False)
End Function
' Handler for parent class
Protected MustOverride Overloads Function ProcessAsync_
(jsonReader As JsonReader, cancellationToken As CancellationToken) As Task
' Map configuration settings to Actions
Protected Overrides Function ActionDelegatesFactory() _
As Dictionary(Of String, Func(Of Task))
Return New Dictionary(Of String, Func(Of Task)) From {
{Me.GetActionKey(DeserializeActionType.Single), _
AddressOf Me.SingleShimAsync},
{Me.GetActionKey(DeserializeActionType.Multiple), _
AddressOf Me.MultipleShimAsync}
}
End Function
#Region "ActionDelegatesFactory Shims"
Private Async Function SingleShimAsync() As Task
Await Me.ProcessActionAsync(
Me._configuration.Paths(Me.ConfigurationFileKey),
AddressOf Me.ProcessAsync,
If(Me.CancellationTokenSource Is Nothing, _
Nothing, Me.CancellationTokenSource.Token)) _
.ConfigureAwait(False)
End Function
Private Async Function MultipleShimAsync() As Task
Await Me.ProcessActionAsync(
Me._configuration,
AddressOf Me.ProcessAsync,
If(Me.CancellationTokenSource Is Nothing, _
Nothing, Me.CancellationTokenSource.Token)) _
.ConfigureAwait(False)
End Function
#End Region
Common.< lib >
压缩文件/web API 基础
// Main Entry Point
public override async ValueTask ProcessAsync()
=> await this.ExecuteActionDelegateAsync
(this.GetActionKey(this.ZipFileAction, this.FileAction))
.ConfigureAwait(false);
#region Processors
// Generate an ActionDelegate key
private string GetActionKey(DeserializeActionType zipFileType,
DeserializeActionType fileType)
=> $"{zipFileType}{this.GetActionKey(fileType)}";
// Execute an ActionDelegate based on configuration settings
protected override async ValueTask ExecuteActionDelegateAsync(string key)
{
Dictionary<string, Func<Task>> ActionDelegates = this.ActionDelegatesFactory();
if (!ActionDelegates.ContainsKey(key))
{
KeyNotFoundException exception =
new KeyNotFoundException
($"The zip '{this.ZipFileAction} ' or file '{this.FileAction}'
Action(s) not found!");
this.Logger?.Emit(LogLevel.Error, "Invalid Action!", exception);
throw exception;
}
await ActionDelegates[key]().ConfigureAwait(false);
}
// Map configuration settings to Actions
protected override Dictionary<string, Func<Task>> ActionDelegatesFactory()
=> new()
{
[this.GetActionKey(DeserializeActionType.Single,
DeserializeActionType.Single)] = ()
=> this.ProcessZipActionAsync
(this._zipConfiguration.Paths[this.ConfigurationZipFileKey!],
this._configuration!.Paths[this.ConfigurationFileKey!],
this.ProcessAsync,
this.CancellationTokenSource?.Token ?? default),
[this.GetActionKey(DeserializeActionType.Multiple,
DeserializeActionType.Single)] = ()
=> this.ProcessZipActionAsync
(this._zipConfiguration,
this._configuration!.Paths[this.ConfigurationFileKey!],
this.ProcessAsync, this.CancellationTokenSource?.Token ?? default),
[this.GetActionKey(DeserializeActionType.Single,
DeserializeActionType.Multiple)] = ()
=> ProcessZipActionAsync
(this._zipConfiguration.Paths[this.ConfigurationZipFileKey!],
this._configuration!, this.ProcessAsync,
this.CancellationTokenSource?.Token ?? default),
[this.GetActionKey(DeserializeActionType.Multiple,
DeserializeActionType.Multiple)] = ()
=> this.ProcessZipActionAsync
(this._zipConfiguration, this.ProcessAsync,
this.CancellationTokenSource?.Token ?? default),
};
' Main Entry Point
Public Overrides Async Function ProcessAsync() As Task
Await Me.ExecuteActionDelegateAsync(Me.GetActionKey_
(Me.ZipFileAction, Me.FileAction)).ConfigureAwait(False)
End Function
' Generate an ActionDelegate key
Private Shadows Function GetActionKey(zipFileType As DeserializeActionType, _
fileType As DeserializeActionType) As String
Return $"{zipFileType}{MyBase.GetActionKey(fileType)}"
End Function
' Execute an ActionDelegate based on configuration settings
Protected Overrides Async Function ExecuteActionDelegateAsync(key As String) As Task
Dim ActionDelegates = ActionDelegatesFactory()
If Not ActionDelegates.ContainsKey(key) Then
Dim exception = New KeyNotFoundException($"The zip '{Me.ZipFileAction} ' _
or file '{Me.FileAction}' Action(s) not found!")
Me._logger.Emit(LogLevel.Error, "Invalid Action!", exception)
Throw exception
End If
Await ActionDelegates(key)().ConfigureAwait(False)
End Function
' Map configuration settings to Actions
Protected Overrides Function ActionDelegatesFactory() _
As Dictionary(Of String, Func(Of Task))
Return New Dictionary(Of String, Func(Of Task)) From {
{Me.GetActionKey(DeserializeActionType.Single, _
DeserializeActionType.Single),
AddressOf Me.SingleSingleShimAsync},
{Me.GetActionKey(DeserializeActionType.Single, _
DeserializeActionType.Multiple),
AddressOf Me.SingleMultipleShimAsync},
{Me.GetActionKey(DeserializeActionType.Multiple, _
DeserializeActionType.Single),
AddressOf Me.MultipleSingleShimAsync},
{Me.GetActionKey(DeserializeActionType.Multiple, _
DeserializeActionType.Multiple),
AddressOf Me.MultipleMultipleShimAsync}
}
End Function
#Region "ActionDelegatesFactory Shims"
' NOTE: VB does not support inline delegates
' with parameters in Dictionaries like C#, only method references,
' so shims are used to wrap method calls with parameters
Private Async Function SingleSingleShimAsync() As Task
Await Me.ProcessZipActionAsync(
Me._zipConfiguration.Paths(Me.ConfigurationZipFileKey),
Me._configuration.Paths(Me.ConfigurationFileKey),
AddressOf Me.ProcessAsync,
If(Me.CancellationTokenSource Is Nothing, Nothing, _
Me.CancellationTokenSource.Token))
ConfigureAwait(False)
End Function
Private Async Function MultipleSingleShimAsync() As Task
Await Me.ProcessZipActionAsync(
Me._zipConfiguration,
Me._configuration.Paths(Me.ConfigurationFileKey),
AddressOf Me.ProcessAsync,
If(Me.CancellationTokenSource Is Nothing, Nothing, _
Me.CancellationTokenSource.Token)).
ConfigureAwait(False)
End Function
Private Async Function SingleMultipleShimAsync() As Task
Await Me.ProcessZipActionAsync(
Me._zipConfiguration.Paths(Me.ConfigurationZipFileKey),
Me._configuration,
AddressOf Me.ProcessAsync,
If(Me.CancellationTokenSource Is Nothing, Nothing, _
Me.CancellationTokenSource.Token)).
ConfigureAwait(False)
End Function
Private Async Function MultipleMultipleShimAsync() As Task
Await Me.ProcessZipActionAsync(
Me._zipConfiguration,
AddressOf Me.ProcessAsync,
If(Me.CancellationTokenSource Is Nothing, Nothing, _
Me.CancellationTokenSource.Token)).
ConfigureAwait(False)
End Function
#End Region
注意:VB 和 C# 处理委托的方式不同。对于 VB,需要 Shim
方法来允许向方法调用传递参数。
第 4 部分:单元测试
单元测试仅针对 文件系统
文件和压缩文件实现,涵盖简单 JSON 集合对象(Contacts
)和复杂 JSON 对象(Ebay CategoryAspect
)。单元测试涵盖
- 标准调用完成
- 具有模拟取消的标准调用
- 无效配置 - 未找到密钥和未找到文件
- 单文件和多文件处理
还有针对以下内容的单元测试
- 自定义
Utf8JsonAsyncStreamReader
类 FilePathHelper
类以及FileConfiguration
所有测试均使用依赖注入完成。
以下扩展旨在实现 Simulated Cancellation
public static class TaskExtensions
{
// executing async method wrapper to capture the results,
// handle cancellation, and capture any exceptions
public static Task TaskAwaiter(this ValueTask valueTask,
CancellationTokenSource? cancellationTokenSource = default, int delay = 2000)
{
//emulate a user cancellation in x milliseconds
cancellationTokenSource?.CancelAfter(delay);
// get a reference to the running task
Task task = valueTask.AsTask();
while (!task.GetAwaiter().IsCompleted &&
cancellationTokenSource?.IsCancellationRequested != true)
{
// waiting ...
}
// we need to capture the task & a snapshot of the statuses before returning
return task;
}
}
Public Module TaskExtensions
' executing async method wrapper to capture the results,
' handle cancellation, and capture any exceptions
<Extension>
Public Function TaskAwaiter(task As Task, _
Optional cancellationTokenSource As CancellationTokenSource = Nothing, _
Optional delay As Integer = 2000) As Task
' emulate a user cancellation in x milliseconds
cancellationTokenSource?.CancelAfter(delay)
While Not task.GetAwaiter().IsCompleted AndAlso
((cancellationTokenSource Is Nothing) _
OrElse cancellationTokenSource.IsCancellationRequested <> True)
'waiting...
End While
' we need to capture the task & a snapshot of the statuses before returning
Return task
End Function
End Module
如果传递 delay = 0
,或者没有取消令牌,则取消将不会执行。
以下是它的使用方式
private const int SimulatedDelay = 5; // must be less than actual execution time
[Fact]
void Live_File_Single_Cancellation()
{
CancellationTokenSource cts = new();
Task task = this.Execute(this._liveDeserializer, TestFileKey,
DeserializeActionType.Single, false, cts);
if (cts.Token.IsCancellationRequested)
this._logger.Emit(LogLevel.Warning, "Cancellation was requested");
task.IsCompleted.Should().BeFalse();
cts.IsCancellationRequested.Should().BeTrue();
this._liveDeserializer.CancellationTokenSource!
.IsCancellationRequested.Should().BeTrue();
this._liveDeserializer.CancellationTokenSource!.Should().Be(cts);
}
private Task Execute
(
IContactFileStreamDeserializer deserializer,
string fileKey,
DeserializeActionType fileAction,
bool failIfFileNotFound,
CancellationTokenSource? cts = default,
int delay = SimulatedDelay
)
{
deserializer.FileId = fileKey;
deserializer.FileAction = fileAction;
deserializer.FailIfFileNotFound = failIfFileNotFound;
deserializer.CancellationTokenSource = cts;
// we need to capture the Task
return deserializer.ProcessAsync().TaskAwaiter(cts, delay);
}
Private Const SimulatedDelay As Integer = 5 ' must be less than actual execution time
<Fact>
Sub Live_File_Single_Cancellation()
Dim cts = New CancellationTokenSource()
Dim task As Task = Me.Execute(Me._liveDeserializer, _
TestFileKey, DeserializeActionType.Single, False, cts)
If cts.Token.IsCancellationRequested Then
Me._logger.Emit(LogLevel.Warning, "Cancellation was requested")
End If
task.IsCompleted.Should().BeFalse()
cts.IsCancellationRequested.Should().BeTrue()
Me._liveDeserializer.CancellationTokenSource._
IsCancellationRequested.Should().BeTrue()
Me._liveDeserializer.CancellationTokenSource.Should().Be(cts)
End Sub
Function Execute _
(
deserializer As IContactFileStreamDeserializer,
fileKey As String,
fileAction As DeserializeActionType,
failIfFileNotFound As Boolean,
Optional cts As CancellationTokenSource = Nothing,
Optional delay As Integer = SimulatedDelay
) As Task
deserializer.FileId = fileKey
deserializer.FileAction = fileAction
deserializer.FailIfFileNotFound = failIfFileNotFound
deserializer.CancellationTokenSource = cts
' we need to capture the Task
Return deserializer.ProcessAsync().TaskAwaiter(cts, delay)
End Function
第 5 部分:基准测试
基准测试针对 C# 和 VB 的文件系统实现,涵盖了简单 JSON 集合对象(Contacts
)和复杂 JSON 对象(Ebay CategoryAspect
)的使用。基准测试包括默认的文件/流方法和自定义库流方法
<Contact/Ebay>_<NewtonSoft/SystemText>_Default
:将整个文件加载到string
中并反序列化<Contact/Ebay>_<NewtonSoft/SystemText>_DefaultStream
:将整个文件加载到流中并反序列化Contact_SystemText_DefaultEnumerableStream
:这是一个独特的测试,使用DeserializeAsyncEnumerable
进行流式传输并一次反序列化一个 JSON 对象<Contact/Ebay>_<NewtonSoft/SystemText>_Streaming
:库流式传输并一次反序列化一个 JSON 对象<Contact/Ebay>_<NewtonSoft/SystemText>_StreamingBatch10
:库流式传输并一次反序列化 10 个 JSON 对象的批次<Contact/Ebay>_<NewtonSoft/SystemText>_StreamingChunk64K
:库流式传输并一次反序列化一个 JSON 对象,使用 64KB 缓冲区<Contact/Ebay>_<NewtonSoft/SystemText>_StreamingBatch10BufferSize64K
:库流式传输并一次反序列化 10 个 JSON 对象的批次,使用 64KB 缓冲区
测试数据
Contacts: 500,000 records / 297,675KB
Ebay Category Aspects: 750 records / 68,118KB
测试机器配置
BenchmarkDotNet=v0.13.2, OS=Windows 11 (10.0.22621.675)
AMD Ryzen 7 3700X, 1 CPU, 16 logical and 8 physical cores
.NET SDK=7.0.100-rc.2.22477.23
[Host] : .NET 6.0.10 (6.0.1022.47605), X64 RyuJIT AVX2
DefaultJob : .NET 6.0.10 (6.0.1022.47605), X64 RyuJIT AVX2
M.2 SSD
C# 基准测试结果
| Method | Mean | Error | StdDev | Ratio | Rank |
|------------------------------------------------- |--------:|---------:|---------:|------:|-----:|
| Contact_SystemText_StreamingBatch10BufferSize64K | 2.342 s | 0.0059 s | 0.0055 s | 0.51 | 1 |
| Contact_SystemText_StreamingChunk64K | 2.415 s | 0.0183 s | 0.0171 s | 0.53 | 2 |
| Contact_SystemText_Streaming | 2.472 s | 0.0176 s | 0.0156 s | 0.54 | 3 |
| Contact_SystemText_DefaultEnumerableStream | 2.480 s | 0.0056 s | 0.0052 s | 0.54 | 3 |
| Contact_SystemText_StreamingBatch10 | 2.536 s | 0.0049 s | 0.0044 s | 0.56 | 4 |
| Contact_SystemText_Default | 4.002 s | 0.0483 s | 0.0452 s | 0.88 | 5 |
| Contact_NewtonSoft_StreamingChunk64K | 4.451 s | 0.0165 s | 0.0129 s | 0.98 | 6 |
| Contact_NewtonSoft_StreamingBatch10BufferSize64K | 4.484 s | 0.0130 s | 0.0122 s | 0.98 | 6 |
| Contact_NewtonSoft_Streaming | 4.556 s | 0.0132 s | 0.0117 s | 1.00 | 7 |
| Contact_NewtonSoft_StreamingBatch10 | 4.636 s | 0.0908 s | 0.0892 s | 1.02 | 7 |
| Contact_NewtonSoft_DefaultStream | 4.729 s | 0.0194 s | 0.0181 s | 1.04 | 8 |
| Contact_NewtonSoft_Default | 6.268 s | 0.0385 s | 0.0341 s | 1.38 | 9 |
| Method | Mean | Error | StdDev | Ratio | Rank |
|---------------------------------------------- |-----------:|---------:|---------:|------:|-----:|
| Ebay_SystemText_DefaultStream | 729.2 ms | 4.71 ms | 4.18 ms | 0.63 | 1 |
| Ebay_SystemText_Default | 970.3 ms | 8.58 ms | 8.02 ms | 0.83 | 2 |
| Ebay_NewtonSoft_StreamingChunk64K | 1,091.8 ms | 4.40 ms | 3.44 ms | 0.94 | 3 |
| Ebay_NewtonSoft_StreamingBatch10BufferSize64K | 1,094.4 ms | 7.32 ms | 6.11 ms | 0.94 | 3 |
| Ebay_NewtonSoft_StreamingBatch10 | 1,122.8 ms | 8.79 ms | 7.79 ms | 0.96 | 4 |
| Ebay_NewtonSoft_Streaming | 1,164.6 ms | 9.93 ms | 9.29 ms | 1.00 | 5 |
| Ebay_NewtonSoft_DefaultStream | 1,248.4 ms | 16.03 ms | 14.99 ms | 1.07 | 6 |
| Ebay_SystemText_StreamingChunk64K | 1,453.1 ms | 4.81 ms | 4.50 ms | 1.25 | 7 |
| Ebay_SystemText_Streaming | 1,534.9 ms | 5.19 ms | 4.86 ms | 1.32 | 8 |
| Ebay_NewtonSoft_Default | 1,536.8 ms | 18.24 ms | 17.06 ms | 1.32 | 8 |
| Ebay_SystemText_StreamingBatch10BufferSize64K | 1,562.3 ms | 5.77 ms | 5.40 ms | 1.34 | 9 |
| Ebay_SystemText_StreamingBatch10 | 1,642.4 ms | 5.60 ms | 5.24 ms | 1.41 | 10 |
VB 基准测试结果
| Method | Mean | Error | StdDev | Ratio | Rank |
|------------------------------------------------- |--------:|---------:|---------:|------:|-----:|
| Contact_SystemText_StreamingChunk64K | 2.379 s | 0.0103 s | 0.0097 s | 0.51 | 1 |
| Contact_SystemText_StreamingBatch10BufferSize64K | 2.382 s | 0.0079 s | 0.0070 s | 0.51 | 1 |
| Contact_SystemText_DefaultEnumerableStream | 2.501 s | 0.0065 s | 0.0061 s | 0.54 | 2 |
| Contact_SystemText_Streaming | 2.657 s | 0.0060 s | 0.0057 s | 0.57 | 3 |
| Contact_SystemText_StreamingBatch10 | 2.687 s | 0.0122 s | 0.0114 s | 0.58 | 3 |
| Contact_SystemText_Default | 4.120 s | 0.0422 s | 0.0395 s | 0.88 | 4 |
| Contact_NewtonSoft_StreamingBatch10BufferSize64K | 4.509 s | 0.0251 s | 0.0235 s | 0.97 | 5 |
| Contact_NewtonSoft_StreamingBatch10 | 4.588 s | 0.0321 s | 0.0300 s | 0.99 | 6 |
| Contact_NewtonSoft_StreamingChunk64K | 4.613 s | 0.0309 s | 0.0289 s | 0.99 | 6 |
| Contact_NewtonSoft_Streaming | 4.655 s | 0.0171 s | 0.0160 s | 1.00 | 6 |
| Contact_NewtonSoft_DefaultStream | 5.492 s | 0.0571 s | 0.0534 s | 1.18 | 7 |
| Contact_NewtonSoft_Default | 6.318 s | 0.0654 s | 0.0612 s | 1.36 | 8 |
| Method | Mean | Error | StdDev | Ratio | Rank |
|---------------------------------------------- |-----------:|---------:|---------:|------:|-----:|
| Ebay_SystemText_DefaultStream | 732.3 ms | 6.43 ms | 5.70 ms | 0.67 | 1 |
| Ebay_SystemText_Default | 957.1 ms | 4.78 ms | 4.23 ms | 0.88 | 2 |
| Ebay_NewtonSoft_StreamingBatch10BufferSize64K | 1,064.6 ms | 11.54 ms | 10.80 ms | 0.97 | 3 |
| Ebay_NewtonSoft_StreamingChunk64K | 1,069.2 ms | 6.06 ms | 5.67 ms | 0.98 | 3 |
| Ebay_NewtonSoft_Streaming | 1,092.3 ms | 5.87 ms | 5.50 ms | 1.00 | 4 |
| Ebay_NewtonSoft_StreamingBatch10 | 1,096.1 ms | 3.42 ms | 3.03 ms | 1.00 | 4 |
| Ebay_NewtonSoft_DefaultStream | 1,220.9 ms | 9.47 ms | 8.86 ms | 1.12 | 5 |
| Ebay_SystemText_StreamingChunk64K | 1,489.3 ms | 4.36 ms | 3.86 ms | 1.36 | 6 |
| Ebay_NewtonSoft_Default | 1,499.3 ms | 13.32 ms | 12.46 ms | 1.37 | 6 |
| Ebay_SystemText_StreamingBatch10BufferSize64K | 1,514.3 ms | 3.99 ms | 3.33 ms | 1.39 | 6 |
| Ebay_SystemText_Streaming | 1,579.1 ms | 4.95 ms | 4.39 ms | 1.45 | 7 |
| Ebay_SystemText_StreamingBatch10 | 1,598.5 ms | 3.85 ms | 3.60 ms | 1.46 | 8 |
评论
- VB 和 C# 之间的性能差异在误差范围内,因此性能基本相同,达到 99%。
Ebay_SystemText_Streaming...
略慢于Ebay_NewtonSoft_Streaming
,因为需要提前读取以找到对象末尾才能反序列化,但仍然比Newtonsoft
将整个文件加载到string
中并反序列化要快- 通过库(使用 64KB 缓冲区大小)的简单 JSON 集合对象(Contacts)比使用默认缓冲区大小(16KB)的
DeserializeAsyncEnumerable
更快 - 对于复杂 JSON 对象,虽然
System.Text.Json
的自定义Utf8JsonAsyncStreamReader
的性能可以接受,但它比System.Text.Json
将整个文件加载或流式传输到内存中要慢。然而,关键在于,自定义Utf8JsonAsyncStreamReader
的内存占用极小,而System.Text.Json
的内存需求则非常高。
总结
处理大型数据流并不像看起来那么难。提供了 C# 和 VB 的示例实现,用于通过 文件系统
和 Web API
处理原始 JSON 数据和压缩 JSON 数据。
Newtonsoft.Json
和 System.Text.Json
API 的文件系统和 Web API 流相似,但 System.Text.Json
API 的性能提升绝对值得迁移到 DotNetCore 6.0+。
虽然 System.Text.Json
缺少用于一次遍历流对象的 Newtonsoft.Json.JsonTextReader
,但我们可以实现自己的高性能自定义 Utf8JsonAsyncStreamReader
。
历史
- v1.0 - 2022 年 11 月 1 日 - 初次发布 (DotNet 6.0)
- v1.01 - 2022 年 11 月 17 日 - 更新并添加了 Dot Net 7.0 的下载
- v1.02 - 2022 年 11 月 18 日 - 将配置属性列表从文本代码块更新为表格