65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 Newtonsoft.Json 和 System.Text.Json 与 C# 和 VB 反序列化 Json 流

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.92/5 (7投票s)

2022年11月1日

CPOL

24分钟阅读

viewsIcon

31849

如何反序列化非常大的简单和复杂 JSON 流 (.NET 6.0 和 7.0)

Dot Net 6.0

Dot Net 7.0

注意:两个下载都包含相同的文件,只是使用了不同的压缩方法。下载后,请参阅 入门 部分以设置要使用的数据。

使用 JSON 系列

目录

引言

这是本系列文章的第三部分,也是最后一部分。我们将介绍如何使用 NewtonSoft.JsonSystem.Text.Json 反序列化 JSON 流。用于流式传输的数据可以来自 Web、文件或其他来源。我们还将研究如何从 zip 文件中反序列化 JSON 文件。最后,我们将这些过程封装到库基类中,以简化所需的代码、单元测试和性能监控的基准测试。

什么是流?

Microsoft 对流的解释

引用

流是字节序列的抽象,例如文件、输入/输出设备、进程间通信管道或 TCP/IP 套接字。Stream 类及其派生类提供了这些不同类型输入和输出的通用视图,并使程序员免于操作系统和底层设备的具体细节。

为什么要使用流式传输?

原因有很多。使用流的主要优点是

  • 我们不必将整个数据(例如 JSON 对象)加载到内存中。它更节省内存,因此可以提高整体性能

处理大型 JSON 数据有什么好处?

  • 在对象反序列化时处理它们,无需将所有对象保留在内存中
  • 如果 JSON 数据存在问题,我们可以快速失败
  • 可以在流式传输中途取消

例如,本文中使用的一个示例 JSON 数据文件大约为 890MB。当加载到内存中并作为单个对象反序列化时,它会消耗超过 6GB 的内存!当作为流反序列化时,每个对象小于 60MB。

VB.NET 限制 (System.Text.Json)

如前一篇文章 在 C# 中使用 System.Text.Json 中所述,VB.NET 不支持 Ref StructUtf8JsonAsyncStreamReader 只能用 C# 编写。

其他例外是用于测试远程数据的 Web API 支持的 JsonSamples.Host 项目和用于构建 JsonSamples 文件夹的 GetDataFiles

除了此限制之外,本文将包含 C# 和 VB 的代码 + 包含 C#(79 个项目)和 VB(79 个项目)的解决方案。

使用的代码和数据

本文有很多部分。我编写了这篇文章,以便您可以选择所需的信息部分。有大量的小型目标项目涵盖了所需的代码,而不是集中在一个花哨的 UI 单体应用程序中。我相信这有助于您理解代码并供您自己使用。

下载中包含的所有代码都涵盖了本地(文件系统)和远程(Web API)流式传输,包括 C# 和 VB,用于 NewtonsoftSystem.Text.Json。C# 和 VB 的基准测试和单元测试都包含在内。

使用的示例数据是使用 Mockaroo 构建的,或者是来自 eBay 的沙盒数据,而不是实时数据。使用的数据文件大约为 900MB。

用于 System.Text.Json 的自定义 Utf8JsonAsyncStreamReader JSON 读取器已经过彻底测试,并已准备好投入生产。


注意:这不是 Newtonsoft JsonReader 的精确替代品,因为它仅用于异步流。

流支持同步和异步 API。本文的重点是多任务/后台任务操作,因此将专门针对异步技术。如果您不熟悉 TPL / Async & Await,请阅读 使用 async 和 await 的异步编程 - Microsoft Learn

C# 和 VB 的解决方案结构如下

  1. Prototypes - 所需的最少代码。库是基于这些原型中的代码构建的。
  2. Applications - 文件系统Web API (.Remote) 的 DI 和非 DI 示例
  3. Libraries - NewtonsoftSystem.Text.Json 流 API 的包装器 + 支持类和扩展
  4. Unit Tests - 用于 NewtonSoftSystem.Text.Json 通用包装器库、自定义 Utf8JsonAsyncStreamReader 和支持辅助方法
  5. Benchmarks 衡量默认 NewtonsoftSystem.Text.Json 方法 + 包装器库的性能

NewtonsoftSystem.Text.Json 都包含 DI(依赖注入)和非 DI 示例项目。此外,每种类型都有 文件系统Web API 版本。有多个项目组成每个应用程序,并分割成单独的类库。使用的应用程序结构如下

    Application
     |
     +-- <type>.<JsonLib>
     |    |
     |    + -- Common.Ebay
     |    |     |
     |    |     +-- Common
     |    |
     |    +--- Common.<JsonLib>
     |    |     |
     |    |     +-- Common
     |    |
     |    +--- Ebay.Resources (.zip)
     |
     +-------- <Application>.Shared 

其中

  • Common 包含所有项目通用的代码
  • Common.<JsonLib> 专门用于使用的 <JsonLib> - NewtonsoftSystem.Text.Json。包含通用流反序列化程序处理程序代码和自定义读取器 + 扩展方法
  • Common.Ebay 配置/文件列表
  • Ebay.Resources JSON 数据文件;带有压缩文件的 .zip
  • Ebay.<JsonLib> 包含类型化的流反序列化程序处理程序和为特定 <JsonLib> 连接的复杂对象模型
  • Application 用于管理处理的核心

注意:解决方案中有许多项目,每种语言大约 79 个,一半用于 Newtonsoft.Json,一半用于 System.Text.Json。此外,还有大型数据文件。编译完整的解决方案将占用大量磁盘空间。因此,强烈建议编译您要运行的示例应用程序。

由于 CSharp 和 VB 都有许多项目,因此每个项目的 *obj* 和 *bin* 文件夹都被移动到根解决方案文件夹的合并 *obj* 和 *bin* 文件夹中,以便于管理。如果您希望我写一篇关于如何做到这一点的提示,请在下面留言。查看 *.csproj* 和 *.vbproj* 以了解我是如何实现这一点的。

定义

我们可以使用两种类型的大型 JSON 文件。我将它们定义如下

  1. 简单 JSON 集合对象
    引用

    根 JSON 集合对象中包含相同类型的对象集合。

  2. 复杂 JSON 对象
    引用

    一个 JSON 对象,其中包含一个或多个属性,其中一个属性是对象和单个属性的集合。要反序列化的属性和/或集合不必位于 JSON 对象的根中。

入门

下载并解压项目解决方案后,您需要运行 JsonSamples.Host Web 项目。这既用于 Prototype/Application 项目,也用于 Setup 项目。一旦 JsonSamples.Host Web 项目(DeserializingJsonStreams.Hosting - CSharp.sln 解决方案)运行,运行 Setup (GetDataFiles) 项目以构建所需的 JsonSamples 文件夹。JsonSamples.Host Web 项目将生成所需的 zip 文件并复制所需的 JSON 示例文件。

注意:如果此过程失败,可能是因为 JsonSamples.Host Web 项目托管端口地址已更改。如果是这种情况,请进入 *Config* 文件夹并更新 *appsettings.json* 文件中的 host 属性。此文件由项目中/解决方案中所有需要远程访问 Web API 服务器的应用程序使用。

{
    "Host" : "localhost:7215"
}

第 1 部分:使用流

流实现 IDisposable 接口。因此,我们需要确保释放资源以避免内存泄漏。

使用文件

await using (Stream stream = File.OpenRead(filename))
{
    // do work here...
}
Using stream As Stream = File.OpenRead(filename)

    ' do work here...
    
End Using

使用 Web API

await using (Stream httpStream = await new HttpClient().GetStreamAsync(this._url)
    .ConfigureAwait(false))
{
    // do work here...
}
Using stream = Await New HttpClient().GetStreamAsync(_url).ConfigureAwait(false)

    ' do work here...
    
End Using

使用 Newtonsoft 进行流式传输

使用 Newtonsoft 处理流非常简单。这是设置使用 FileStream 的示例片段

using TextReader textReader = new StreamReader(stream);
using JsonReader jsonReader = new JsonTextReader(textReader);

// pass the reader and file stream 
await using (stream.ConfigureAwait(false))
{
    // do work here...
}
' set up the stream readers
Using textReader As TextReader = New StreamReader(stream)
    Using jsonReader As JsonReader = New JsonTextReader(textReader)

        ' do work here...
    
    End Using
End Using

简单的 JSON 集合对象反序列化

典型的简单 JSON 集合是数组中的对象列表

[
  {
    "id":1,
    "first_name":"Osbert",
    "last_name":"Petcher"
  },
  {
    "id":2,
    "first_name":"Salvador",
    "last_name":"Marmion"
  },
  {
    "id":3,
    "first_name":"Kellen",
    "last_name":"Philbin"
  },
  {
    "id":4,
    "first_name":"Fred",
    "last_name":"Thuillier"
  }
]

由于这是一个对象集合,内置的 JsonSerializer 将一次处理一个对象。

以下是使用 Newtonsoft.Json 将上述 JSON 反序列化为 FileStream 的代码

using Newtonsoft.Json;

this._jsonSerializerSettings = new();
this._serializer = JsonSerializer.Create(this._jsonSerializerSettings);

// open the _file as a stream
await using Stream stream = File.OpenRead(filename);

// set up the stream readers
using TextReader textReader = new StreamReader(stream);
using JsonReader jsonReader = new JsonTextReader(textReader);

await using (stream.ConfigureAwait(false))
{
    // move to the start of the array and read the stream
    await jsonReader.ReadAsync().ConfigureAwait(false);
 
    while (await jsonReader.ReadAsync().ConfigureAwait(false) &&
           jsonReader.TokenType != JsonToken.EndArray)
    {
        Contact? contact = this._serializer!.Deserialize<Contact>(jsonReader);
        Process(contact!);
    }
}
Imports Newtonsoft.Json

_jsonSerializerSettings = New JsonSerializerSettings()
_serializer = JsonSerializer.Create(_jsonSerializerSettings)

' open the _file as a stream
Using stream As Stream = File.OpenRead(filename)

    ' set up the stream readers
    Using textReader As TextReader = New StreamReader(stream)
        Using jsonReader As JsonReader = New JsonTextReader(textReader)

            ' move to the start of the array and read the stream
            Await jsonReader.ReadAsync().ConfigureAwait(False)

            While Await jsonReader.ReadAsync().ConfigureAwait(False) AndAlso
                  jsonReader.TokenType <> JsonToken.EndArray

                Dim contact = _serializer.Deserialize(Of Contact)(jsonReader)
                Process(contact)

            End While

        End Using
    End Using

End Using

以及处理过程

private void Process(Contact? item)
{
    // process and store the data model
    this._count++;
}
Private Sub Process(item As Contact)

    ' process and store the data model
    _count += 1

End Sub
文件流示例

将所有内容组合在一起,我们最终得到如下所示的内容

using Common.Helpers;
using Contacts.NewtonSoft.Json.Models;
using Newtonsoft.Json;

internal class Program
{
    #region Fields

    private readonly IFilePathHelper _fileHelper = new FilePathHelper("Resources");

    private JsonSerializer? _serializer;
    private JsonSerializerSettings? _jsonSerializerSettings;

    private int _count;
    private readonly string _file = "Mock_Contacts1.json";

    #endregion

    #region Methods

    private static async Task Main()
        => await new Program().Execute().ConfigureAwait(false);

    private async Task Execute()
    {
        Console.WriteLine($"Reading {this._file}");

        this._jsonSerializerSettings = new();
        this._serializer = JsonSerializer.Create(this._jsonSerializerSettings);

        // open the _file as a stream
        await using FileStream stream = 
              File.OpenRead(this._fileHelper.Resolve(this._file));

        Console.WriteLine($"Processing: {this._file}");

        // set up the stream readers
        using TextReader textReader = new StreamReader(stream);
        using JsonReader jsonReader = new JsonTextReader(textReader);

        await using (stream.ConfigureAwait(false))
        {
            // move to start of first object
            await jsonReader.ReadAsync().ConfigureAwait(false);
            
            while (await jsonReader.ReadAsync().ConfigureAwait(false) &&
                   jsonReader.TokenType != JsonToken.EndArray)
            {
                Contact? contact = this._serializer!.Deserialize<Contact>(jsonReader);
                Process(contact!);
            }
        }

        // report results
        Console.WriteLine($"Contacts: {this._count:N0}");
        Console.WriteLine("Finished");
    }

    private void Process(Contact? item)
    {
        // process and store the data model
        this._count++;
    }

    #endregion
}
Imports System.IO
Imports Common.Helpers
Imports Newtonsoft.Json
Imports NewtonsoftContact.Models

Module Program

#Region "Fields"

    Private ReadOnly _fileHelper As FilePathHelper = New FilePathHelper("Resources")

    Private _serializer As JsonSerializer
    Private _jsonSerializerSettings As JsonSerializerSettings

    Private _count As Integer
    Private ReadOnly _file As String = "Mock_Contacts1.json"

#End Region

#Region "Methods"

    Sub Main(args As String())
        Console.WriteLine($"Reading {_file}")
        ExecuteAsync(args).GetAwaiter.GetResult()
        Console.WriteLine("Finished")
    End Sub

    Private Async Function ExecuteAsync(args As String()) As Task

        _jsonSerializerSettings = New JsonSerializerSettings()
        _serializer = JsonSerializer.Create(_jsonSerializerSettings)

        ' open the _file as a stream
        Using stream As Stream = File.OpenRead(_fileHelper.Resolve(_file))

            ' set up the stream readers
            Using textReader As TextReader = New StreamReader(stream)
                Using jsonReader As JsonReader = New JsonTextReader(textReader)

                    ' move to the start of the array and read the stream
                    Await jsonReader.ReadAsync().ConfigureAwait(False)

                    While Await jsonReader.ReadAsync().ConfigureAwait(False) AndAlso
                          jsonReader.TokenType <> JsonToken.EndArray

                        Dim contact = _serializer.Deserialize(Of Contact)(jsonReader)
                        Process(contact)

                    End While

                End Using
            End Using

        End Using

        ' report results
        Console.WriteLine($"Contacts: {_count:N0}")

    End Function

    Private Sub Process(item As Contact)

        ' process and store the data model
        _count += 1

    End Sub

#End Region

End Module

注意:要查看代码运行,请参阅 prototype \ local \ SimpleData \ NewtonsoftContact VB/C# 项目。

Web API 示例

Web API 版本几乎相同

using Common.Settings;
using Contacts.NewtonSoft.Json.Models;
using Newtonsoft.Json;

internal class Program
{
    #region Fields

    private JsonSerializer? _serializer;
    private JsonSerializerSettings? _jsonSerializerSettings;

    private int _count;
    private string _url = "https://{0}/download/MOCK1";

    #endregion

    #region Methods

    private static async Task Main()
        => await new Program().Execute().ConfigureAwait(false);

    private async Task Execute()
    {
        // point to the correct host URL in appsettings.json file
        this._url = this._url.Build();

        // initialize the serializer
        this._jsonSerializerSettings = new();
        this._serializer = JsonSerializer.Create(this._jsonSerializerSettings);

        // open the stream
        Console.WriteLine($"Connecting to stream: {this._url}");

        Stream? stream;

        try
        {
            stream = await new HttpClient().GetStreamAsync(this._url);
        }
        catch (Exception)
        {
            Console.WriteLine($"Failed to open stream {this._url}. 
            Please check that the remote server is active.");
            return;
        }

        if (stream is null)
        {
            Console.WriteLine($"Failed to open stream {this._url}");
            return;
        }

        Console.WriteLine($"Processing: {this._url}");

        // set up the stream readers
        using TextReader textReader = new StreamReader(stream);
        using JsonReader jsonReader = new JsonTextReader(textReader);

        await using (stream.ConfigureAwait(false))
        {
            // move to start of first object
            await jsonReader.ReadAsync().ConfigureAwait(false);
            
            while (await jsonReader.ReadAsync().ConfigureAwait(false) &&
                   jsonReader.TokenType != JsonToken.EndArray)
            {
                Contact? contact = this._serializer!.Deserialize<contact>(jsonReader);
                Process(contact!);
            }
        }

        // manually clean up connection
        stream.Close();
        await stream.DisposeAsync().ConfigureAwait(false);

        // report results
        Console.WriteLine($"Contacts: {this._count:N0}");
        Console.WriteLine("Finished");
    }

    private void Process(Contact? item)
    {
        // process and store the data model
        this._count++;
    }

    #endregion
}
Imports System.IO
Imports System.Net.Http
Imports Common.Settings
Imports Newtonsoft.Json
Imports NewtonsoftContact.Remote.Models

Module Program

#Region "Fields"

    Private _serializer As JsonSerializer
    Private _jsonSerializerSettings As JsonSerializerSettings

    Private _count As Integer
    Private _url As String = "https://{0}/download/MOCK1"

#End Region

#Region "Methods"

    Sub Main(args As String())
        ExecuteAsync(args).GetAwaiter.GetResult()
        Console.WriteLine("Finished")
    End Sub

    Private Async Function ExecuteAsync(args As String()) As Task

        ' point to the correct host URL in appsettings.json file
        _url = _url.Build()

        _jsonSerializerSettings = New JsonSerializerSettings()
        _serializer = JsonSerializer.Create(_jsonSerializerSettings)

        ' open the stream
        Console.WriteLine($"Connecting to stream: {_url}")

        Dim stream As Stream

        Try

            stream = Await New HttpClient().GetStreamAsync(_url)

        Catch ex As Exception

            Console.WriteLine($"Failed to open stream {_url}. 
            Please check that the remote server is active.")
            Return

        End Try

        If stream Is Nothing Then
            Console.WriteLine($"Failed to open stream {_url}")
            Return
        End If

        Console.WriteLine($"Processing: {_url}")

        ' set up the stream readers
        Using textReader As TextReader = New StreamReader(stream)
            Using jsonReader As JsonReader = New JsonTextReader(textReader)

                ' move to the start of the array and read the stream
                Await jsonReader.ReadAsync().ConfigureAwait(False)

                While Await jsonReader.ReadAsync().ConfigureAwait(False) AndAlso
                      jsonReader.TokenType <> JsonToken.EndArray

                    Dim contact = _serializer.Deserialize(Of Contact)(jsonReader)
                    Process(contact)

                End While

            End Using
        End Using

        ' manually clean up connection
        stream.Close()
        Await stream.DisposeAsync().ConfigureAwait(False)

        ' report results
        Console.WriteLine($"Contacts: {_count:N0}")

    End Function

    Private Sub Process(item As Contact)

        ' process and store the data model
        _count += 1

    End Sub

#End Region

End Module

注意:要查看代码运行,请参阅 prototype \ remote \ SimpleData \ NewtonsoftContact VB/C# 项目。

具有选择性反序列化的复杂 JSON 对象

复杂 JSON 由单个简单属性、对象和集合组成。

下面是复杂 JSON 数据结构的示例

{
  "categoryTreeId": "123",
  "categoryTreeVersion": "1.234a",
  "categoryAspects": [
    {
      "category": {
        "categoryId": "111",
        "categoryName": "Category 1"
      },
      "aspects": [
        {
          "localizedAspectName": "1:Aspect 1"
        },
        {
          "localizedAspectName": "1:Aspect 2"
        },
        {
          "localizedAspectName": "1:Aspect 3"
        }
      ]
    },
    {
      "category": {
        "categoryId": "222",
        "categoryName": "Category 2"
      },
      "aspects": [
        {
          "localizedAspectName": "2:Aspect 1"
        },
        {
          "localizedAspectName": "2:Aspect 2"
        },
        {
          "localizedAspectName": "2:Aspect 3"
        }
      ]
    }
  ]
}

我们只对 "categoryAspects" 集合感兴趣。这里不能使用简单 JSON 集合中使用的方法。如果我们这样做,整个对象将被加载到内存中,而不是每个 CategoryAspect 对象属性。

为了帮助理解下面的代码,这里是手动遍历结构以反序列化每个 CategoryAspect 对象的定义

  • 检查每个属性
  • 当我们找到 "categoryAspects" 属性时,我们就可以提取每个 CategoryAspect 对象
    • 找到对象的开始
    • 遍历并存储对象图,直到到达对象的末尾
    • 反序列化对象图
    • 重复直到到达数组对象的末尾

以下是使用 Newtonsoft.Json 将上述 JSON 反序列化为 FileStream 的代码

using Newtonsoft.Json;

JsonSerializer _serializer = new();

// open the _file as a stream
await using FileStream stream = File.OpenRead(_filename);

// set up the stream readers
using TextReader textReader = new StreamReader(stream);
using JsonReader jsonReader = new JsonTextReader(textReader);

await using (stream.ConfigureAwait(false))
{
    // move to the start of the array and read the stream
    await jsonReader.ReadAsync().ConfigureAwait(false);
 
    // walk the collection of objects to the end of the collection   
    while (await jsonReader.ReadAsync().ConfigureAwait(false) &&
           jsonReader.TokenType != JsonToken.EndArray)
        await ProcessAsync(jsonReader).ConfigureAwait(false);
}
Imports Newtonsoft.Json

_jsonSerializerSettings = New JsonSerializerSettings()
_serializer = JsonSerializer.Create(_jsonSerializerSettings)

' open the _file as a stream
Using stream As Stream = File.OpenRead(_filename)

    ' set up the stream readers
    Using textReader As TextReader = New StreamReader(stream)
        Using jsonReader As JsonReader = New JsonTextReader(textReader)

            ' move to the start of the array and read the stream
            While Await jsonReader.ReadAsync().ConfigureAwait(False)

                Await ProcessAsync(jsonReader).ConfigureAwait(False)

            End While

        End Using
    End Using

End Using

遍历 JSON 图的代码

// walk the stream
private async Task ProcessAsync(JsonReader jsonReader)
{
    // make sure we are looking at the correct json element
    if (jsonReader.TokenType != JsonToken.PropertyName)
        return;

    // process properties for data that we want
    if (jsonReader.GetString() == "categoryTreeVersion")
    {
        // get the value
        await jsonReader.ReadAsync().ConfigureAwait(false);

        string? version = jsonReader.GetString();
        Console.WriteLine($"Version: {version ?? "no value"}");
    }

    else if (jsonReader.GetString() == "categoryTreeId")
    {
        // get the value
        await jsonReader.ReadAsync().ConfigureAwait(false);

        string? id = jsonReader.GetString();
        Console.WriteLine($"Id:      {id ?? "no value"}");
    }

    else if (jsonReader.GetString() == "categoryAspects")
    {
        // move to the start of the array
        await jsonReader.ReadAsync().ConfigureAwait(false);

        // step through each complete object in the Json Array
        while (await jsonReader.ReadAsync().ConfigureAwait(false) &&
               jsonReader.TokenType != JsonToken.EndArray)
            ProcessCollection(jsonReader);
    }
}
'  walk the stream
Private Async Function ProcessAsync(jsonReader As JsonReader) As Task

    If jsonReader.TokenType <> JsonToken.PropertyName Then
        Return
    End If

    ' process properties for data that we want
    If jsonReader.GetString() = "categoryTreeVersion" Then

        ' get the value
        Await jsonReader.ReadAsync().ConfigureAwait(False)

        Dim version = jsonReader.GetString()
        Console.WriteLine($"Version: {If(version, "no value")}")

    End If

    If jsonReader.GetString() = "categoryTreeId" Then

        ' get the value
        Await jsonReader.ReadAsync().ConfigureAwait(False)

        Dim Id = jsonReader.GetString()
        Console.WriteLine($"Id: {If(Id, "no value")}")

    End If

    If jsonReader.GetString() = "categoryAspects" Then

        ' move to the start of the array
        Await jsonReader.ReadAsync().ConfigureAwait(False)

        'step through each complete object in the Json Array
        While Await jsonReader.ReadAsync().ConfigureAwait(False) AndAlso
              jsonReader.TokenType <> JsonToken.EndArray
            ProcessCollection(jsonReader)
        End While

    End If

End Function

注意:上述处理代码不限于根节点,它将搜索 JSON 图并仅处理那些已识别的节点。

要存储每个对象,我们使用与前面简单集合示例相同的代码

private void ProcessCollection(JsonReader jsonReader)
{
    CategoryAspect? categoryAspect = 
            _serializer!.Deserialize<CategoryAspect>(jsonReader);

    // process and store the data model
    _count++;
} 
Private Sub ProcessCollection(jsonReader As JsonReader)

    Dim categoryAspect = _serializer.Deserialize(Of CategoryAspect)(jsonReader)

    ' process and store the data model
    _count += 1

End Sub

注意:要查看代码运行,请参阅 prototype \ local \ ComplexData \ NewtonsoftEbay VB/C# 项目。

使用压缩的 JSON 数据文件

Zip 文件非常适合压缩基于文本的 JSON 文件,尤其是在处理非常大的文件时。我们可以流式读取 zip 文件和其中的压缩 JSON 文件。

读取数据的代码与上面相同,我们只需要添加代码来打开 zip 文件并读取条目。这里,我假设每个文件都是正确的类型

using ZipArchive zipArchive = new(File.OpenRead(_filename));

foreach (ZipArchiveEntry zipArchiveEntry in zipArchive.Entries)
{
    Console.WriteLine($"Processing: {_file} > {zipArchiveEntry.FullName}");

    await using Stream stream = zipArchiveEntry.Open();

    // set up the stream readers
    using TextReader textReader = new StreamReader(stream);
    using JsonReader jsonReader = new JsonTextReader(textReader);

     // do work here...
 }
Using zipArchive = New ZipArchive(File.OpenRead(_fileHelper.Resolve(_file)))

    For Each zipArchiveEntry As ZipArchiveEntry In zipArchive.Entries

        Console.WriteLine($"Processing: {_file} > {zipArchiveEntry.FullName}")

        Using stream As Stream = zipArchiveEntry.Open()

            ' set up the stream readers
            Using textReader As TextReader = New StreamReader(stream)
                Using jsonReader = New JsonTextReader(textReader)

                    ' do work here

                End Using

            End Using

        End Using

    Next

End Using

如果您需要查找特定文件,只需检查每个 ZipArchive 条目的名称

using ZipArchive zipArchive = new(File.OpenRead(_filename));

foreach (ZipArchiveEntry zipArchiveEntry in zipArchive.Entries)
{
    if (zipArchiveEntry.Name == "file_name_goes_here")
    {
        Console.WriteLine($"Processing: {_file} > {zipArchiveEntry.FullName}");

        // set up the stream readers
        await using Stream stream = zipArchiveEntry.Open();

        using TextReader textReader = new StreamReader(stream);
        using JsonReader jsonReader = new JsonTextReader(textReader);

        // do work here...
    }
}
Using zipArchive = New ZipArchive(File.OpenRead(_fileHelper.Resolve(_file)))

    For Each zipArchiveEntry As ZipArchiveEntry In zipArchive.Entries

        if zipArchiveEntry.Name == "file_name_goes_here" then

            Console.WriteLine($"Processing: {_file} > {zipArchiveEntry.FullName}")
    
            Using stream As Stream = zipArchiveEntry.Open()
    
                ' set up the stream readers
                Using textReader As TextReader = New StreamReader(stream)
                    Using jsonReader = New JsonTextReader(textReader)
    
                        ' do work here
    
                    End Using
                End Using

            End Using

        End If

    Next

End Using
文件流示例

将所有内容组合在一起,我们最终得到如下所示的内容

using Common.Helpers;
using Ebay.NewtonSoft.Json.Models;
using Newtonsoft.Json;

internal class Program
{
    #region Fields

    private readonly IFilePathHelper _fileHelper =
                         new FilePathHelper("Resources");

    private JsonSerializer? _serializer;
    private JsonSerializerSettings? _jsonSerializerSettings;

    private int _count;
    private readonly string _file = "EBAY_US FetchItemAspectsResponse.json";

    #endregion

    #region Methods

    private static async Task Main()
        => await new Program().Execute().ConfigureAwait(false);

    private async Task Execute()
    {
        Console.WriteLine($"Reading {this._file}");

        this._jsonSerializerSettings = new();
        this._serializer = JsonSerializer.Create(this._jsonSerializerSettings);

        using ZipArchive zipArchive =
            new(File.OpenRead(this._fileHelper.Resolve(this._file)));

        foreach (ZipArchiveEntry zipArchiveEntry in zipArchive.Entries)
        {
            Console.WriteLine
            ($"Processing: {this._file} > {zipArchiveEntry.FullName}");

            await using Stream stream = zipArchiveEntry.Open();

            // set up the stream readers
            using TextReader textReader = new StreamReader(stream);
            using JsonReader jsonReader = new JsonTextReader(textReader);

            await using (stream.ConfigureAwait(false))
            {
                // move to the start of the array and read the stream
                while (await jsonReader.ReadAsync().ConfigureAwait(false))
                    await this.ProcessAsync(jsonReader).ConfigureAwait(false);
            }
        }

        // report results
        Console.WriteLine($"CategoryAspects: {this._count:N0}");
        Console.WriteLine("Finished");
    }

    private async Task ProcessAsync(JsonReader jsonReader)
    {
        if (jsonReader.TokenType != JsonToken.PropertyName) return;

        // process properties for data that we want

        if (jsonReader.GetString() == "categoryTreeVersion")
        {
            // get the value
            await jsonReader.ReadAsync().ConfigureAwait(false);

            string? version = jsonReader.GetString();
            Console.WriteLine($"Version: {version ?? "no value"}");
        }

        else if (jsonReader.GetString() == "categoryTreeId")
        {
            // get the value
            await jsonReader.ReadAsync().ConfigureAwait(false);

            string? id = jsonReader.GetString();
            Console.WriteLine($"Id:      {id ?? "no value"}");
        }

        else if (jsonReader.GetString() == "categoryAspects")
        {
            // move to the start of the array
            await jsonReader.ReadAsync().ConfigureAwait(false);

            // step through each complete object in the Json Array
            while (await jsonReader.ReadAsync().ConfigureAwait(false) &&
                   jsonReader.TokenType != JsonToken.EndArray)
                this.ProcessCollection(jsonReader);
        }
    }

    private void ProcessCollection(JsonReader jsonReader)
    {
        CategoryAspect? categoryAspect = 
        this._serializer!.Deserialize<categoryaspect>(jsonReader);

        // process and store the data model
        this._count++;
    } 

    #endregion
}
Imports System.IO
Imports System.IO.Compression
Imports Common.Helpers
Imports Newtonsoft.Json
Imports NewtonSoftZippedEbay.Models

' Mock_Json_Files

Module Program

#Region "Fields"

    Private ReadOnly _fileHelper As FilePathHelper =
                         New FilePathHelper("Resources")

    Private _serializer As JsonSerializer
    Private _jsonSerializerSettings As JsonSerializerSettings

    Private _count As Integer
    Private ReadOnly _file As String = "EBay CategoryAspects.zip"

#End Region

#Region "Methods"

    Sub Main(args As String())
        Console.WriteLine($"Reading {_file}")
        ExecuteAsync(args).GetAwaiter.GetResult()
        Console.WriteLine("Finished")
    End Sub

    Private Async Function ExecuteAsync(args As String()) As Task

        _jsonSerializerSettings = New JsonSerializerSettings()
        _serializer = JsonSerializer.Create(_jsonSerializerSettings)

        Using zipArchive = New ZipArchive(File.OpenRead(_fileHelper.Resolve(_file)))

            For Each zipArchiveEntry As ZipArchiveEntry In zipArchive.Entries

                Console.WriteLine($"Processing: {_file} > {zipArchiveEntry.FullName}")

                Using stream As Stream = zipArchiveEntry.Open()

                    ' set up the stream readers
                    Using textReader As TextReader = New StreamReader(stream)
                        Using jsonReader = New JsonTextReader(textReader)

                            ' move to the start of the array and read the stream
                            While Await jsonReader.ReadAsync().ConfigureAwait(False)

                                Await ProcessAsync(jsonReader).ConfigureAwait(False)

                            End While

                        End Using

                    End Using

                End Using

            Next

        End Using

        ' report results
        Console.WriteLine($"CategoryAspects: {_count:N0}")

    End Function

    Private Async Function ProcessAsync(jsonReader As JsonReader) As Task

        If jsonReader.TokenType <> JsonToken.PropertyName Then
            Return
        End If

        ' process properties for data that we want

        If jsonReader.GetString() = "categoryTreeVersion" Then

            ' get the value
            Await jsonReader.ReadAsync().ConfigureAwait(False)

            Dim version = jsonReader.GetString()
            Console.WriteLine($"Version: {If(version, "no value")}")

        End If

        If jsonReader.GetString() = "categoryTreeId" Then

            ' get the value
            Await jsonReader.ReadAsync().ConfigureAwait(False)

            Dim Id = jsonReader.GetString()
            Console.WriteLine($"Id: {If(Id, "no value")}")

        End If

        If jsonReader.GetString() = "categoryAspects" Then

            ' move to the start of the array
            Await jsonReader.ReadAsync().ConfigureAwait(False)

            'step through each complete object in the Json Array
            While Await jsonReader.ReadAsync().ConfigureAwait(False) AndAlso
                  jsonReader.TokenType <> JsonToken.EndArray
                ProcessCollection(jsonReader)
            End While

        End If

    End Function

    Private Sub ProcessCollection(jsonReader As JsonReader)

        Dim categoryAspect = _serializer.Deserialize(Of CategoryAspect)(jsonReader)

        ' process and store the data model
        _count += 1

    End Sub

#End Region

End Module

注意:要查看代码运行,请参阅 prototype \ local \ ComplexZippedData \ NewtonSoftZippedEbay VB/C# 项目。还有一个联系人版本和用于联系人和 Ebay 的 Web API 版本。

注意:在撰写本文时 (DotNet 6.0),如果您正在使用 Web API 流式传输,DotNet 将在您流式读取*之前*下载整个 Zip 文件。上述代码和项目示例会将整个 zip 文件加载到内存中。

如果您正在处理非常大的 zip 文件,您需要先流式传输到磁盘上的缓存文件,然后才能打开存档进行流式传输。这将使内存使用量保持在最低限度。如果您想缓存到文件,并且不确定如何将流下载到文件,请查看 GetDataFiles 项目中的 DowloadService 以了解如何操作。

示例项目

如果您想查看上述代码的实际运行情况,请下载项目并在 *Prototypes\Local* 或 *Prototypes\Remote* 文件夹中运行示例。每个文件夹中有四个示例

  • 文件系统:NewtonsoftContactsNewtonsoftZippedContacts 用于简单 JSON 集合对象反序列化NewtonsoftEbayNewtonSoftZippedEbay 用于具有选择性反序列化的复杂 JSON 对象
  • Web API:NewtonsoftContacts.RemoteNewtonsoftZippedContacts.Remote 用于简单 Json 集合对象反序列化NewtonsoftEbay.RemoteNewtonSoftZippedEbay.Remote 用于具有选择性反序列化的复杂 JSON 对象

注意:对于 Web API 示例项目,您需要在运行 .Remote 示例项目之前运行 JsonSamples.Host Web 项目。

使用 System.Text.Json 进行流式传输

开箱即用,System.Text.Json 支持流式传输。它看起来像这样

using System.Text.Json;

// open the _file as a stream
await using FileStream stream = File.OpenRead(filename);

// Deserialize the stream
List<Contact> contacts = await JsonSerializer.DeserializeAsync<List<Contact>>(stream);
Imports System.Text.Json

Using stream as FileStream = File.OpenRead(filename)

Dim contacts = JsonSerializer.DeserializeAsync(Of List(Of Contact))(stream)

缺点是这需要将整个文件加载到内存中才能进行反序列化。这与执行以下操作相同

using System.Text.Json;

// open ald load the file into memory
string rawJon = await File.ReadAllTextAsync(filename);

List<Contact> contacts = JsonSerializer.Deserialize<List<Contact>>(rawJon);
Imports System.Text.Json

Dim rawJson As String = Await File.ReadAllTextAsync(filename)

Dim contacts = JsonSerializer.Deserialize(Of List(Of Contact))(rawJson)

对于简单 JSON 集合对象,如上面的示例,System.Text.Json 确实支持按对象反序列化,从而避免将整个文件加载到内存中。这是一个例子

using System.Text.Json;

// open the _file as a stream
await using FileStream stream = File.OpenRead(filename);

await foreach 
(var Contact in JsonSerializer.DeserializeAsyncEnumerable<Contact>(stream))
{
    // do work here...
}
Imports System.Text.Json

_options = New JsonSerializerOptions()

' open the _file as a stream
Using stream As Stream = File.OpenRead(_fileHelper.Resolve(_file))

    Console.WriteLine($"Processing: {_file}")

    ' VB version of C#'s "await foreach(..)"
    Dim iterator = JsonSerializer.DeserializeAsyncEnumerable(Of Contact) _
            (stream, _options).GetAsyncEnumerator()

    Do While Await iterator.MoveNextAsync()
        Dim item = iterator.Current
        Process(item)
    Loop

    Await iterator.DisposeAsync()

End Using

注意:从上面的示例可以看出,VB 没有 C# 的 await foreach 异步循环,因此我们需要手动遍历异步集合

Dim iterator = <method_goes_here>.GetAsyncEnumerator()

Do While Await iterator.MoveNextAsync()
    Dim item = iterator.Current
    ' porcess item here
Loop

Await iterator.DisposeAsync()

上述示例的缺点是,它不适用于具有选择性反序列化的复杂 JSON 对象。开箱即用,没有支持。我们必须自己使用自定义 Stream Reader 编写它。我已经创建了一个,接下来我们将探讨它。

文件流示例

将所有内容组合在一起,我们最终得到如下所示的内容

using Common.Helpers;
using System.Text.Json;
using Contacts.System.Text.Json.Models;

internal class Program
{
    #region Fields

    private readonly IFilePathHelper _fileHelper =
                                   new FilePathHelper("Resources");

    private JsonSerializerOptions? _jsonSerializerOptions;

    private int _count;
    private readonly string _file = "Mock_Contacts1.json";

    #endregion

    #region Methods

    private static async Task Main()
        => await new Program().Execute().ConfigureAwait(false);

    private async Task Execute()
    {
        Console.WriteLine($"Reading {this._file}");

        this._jsonSerializerOptions = new();

        // open the _file as a stream
        await using FileStream stream =
                       File.OpenRead(this._fileHelper.Resolve(this._file));

        Console.WriteLine($"Processing: {this._file}");

        // deserialize the stream an object at a time...
        await foreach (Contact? item in
                JsonSerializer.DeserializeAsyncEnumerable<Contact>
                    (stream, this._jsonSerializerOptions))
            Process(item);

        // report results
        Console.WriteLine($"Contacts: {this._count:N0}");
        Console.WriteLine("Finished");
    }

    private void Process(Contact? item)
    {
        // process and store the data model
        this._count++;
    }

    #endregion
}
Imports System.IO
Imports System.Text.Json
Imports System.Text.Json.Stream
Imports Common.Helpers
Imports SystemTextJsonContact.Models

Module Program

#Region "Fields"

    Private ReadOnly _fileHelper As FilePathHelper =
                         New FilePathHelper("Resources")

    Private _options As JsonSerializerOptions

    Private _count As Integer
    Private ReadOnly _file As String = "Mock_Contacts1.json"

#End Region

#Region "Methods"

    Sub Main(args As String())
        Console.WriteLine($"Reading {_file}")
        MainAsync(args).GetAwaiter.GetResult()
        Console.WriteLine("Finished")
    End Sub

    Private Async Function MainAsync(args As String()) As Task

        _options = New JsonSerializerOptions()

        ' open the _file as a stream
        Using stream As Stream = File.OpenRead(_fileHelper.Resolve(_file))

            Console.WriteLine($"Processing: {_file}")

            ' VB version of C#'s "await foreach(..)"
            Dim iterator = JsonSerializer.DeserializeAsyncEnumerable(Of Contact) _
                    (stream, _options).GetAsyncEnumerator()

            While Await iterator.MoveNextAsync()

                Dim item = iterator.Current
                Process(item)

            End While

            Await iterator.DisposeAsync()

        End Using

        ' report results
        Console.WriteLine($"Contacts: {_count:N0}")

    End Function

    Private Sub Process(item As Contact)

        ' process and store the data model
        _count += 1

    End Sub

#End Region

End Module

注意:要查看代码运行,请参阅 prototype \ local \ SimpleData \ SystemTextJsonContact VB/C# 项目。对于 Web API 版本,请参阅 prototype \ remote \ SimpleData \ SystemTextJsonContact VB/C# 项目

第 2 部分:自定义 Utf8JsonAsyncStreamReader

目标是编写一个能够像 NewtonSoft.Json.JsonTextReader 一样工作的 Stream Reader,并且代码更改最少。在下一节“编写自定义 Stream Reader”中,我将详细介绍如何实现它。

如何使用新的 Utf8JsonAsyncStreamReader

System.Text.JsonNewtonsoft.Json 的重写,因此好处是解决方案只有一行代码——TextReaderJsonTextReader 都合并到一个类中。这将适用于任何流类型。所以使用它就像

using Utf8JsonAsyncStreamReader jsonReader = new Utf8JsonAsyncStreamReader(stream);

// do work here...
Using jsonReader As Utf8JsonAsyncStreamReader = New Utf8JsonAsyncStreamReader(stream)

    ' do work here...

End Using

注意Utf8JsonAsyncStreamReaderTextReaderJsonTextReader 的直接替代品,因此与上面的 NewtonSoft 示例代码完全相同。

编写自定义流读取器

注意:我不会在这里倾倒所有代码,只倾倒重要的部分。自定义 Stream Readers 的完整代码可以在项目 libaries \ System.Text.Json \ System.Text.Json.Stream 中找到。所有代码都经过充分注释,解释了其工作原理。

我对这个主题进行了大量研究,因为如果别人已经找到了解决方案,我不想做这项工作。

有一个是*同步*的:mtosh(原始解决方案)- StackOverflow,然后是evil-dr-nick - Github注意:我在可下载代码中包含了更新版本,修复了一些小问题并对代码进行了现代化。

我们需要一个异步解决方案。目前还没有……直到现在!

经过几次尝试,我使用 System.IO.Pipelines API 提出了以下解决方案。为什么不像 mtosh 的上述同步解决方案那样使用 Span<T>Memory<t> 呢?Dot Net 6.0 PipeReader 类仅支持 ReadAtLeastAsync,不能保证使用 ReadOnlySequenceSegment<T> 所需的精确字节数。

使用 PipeReader 处理流的好处是它管理流的处理并返回一个 ReadOnlySequence<t> 结构。这使我们能够快速且原始地访问字节。

// move the start of the buffer past what has already been consumed
if (this._bytesConsumed > 0) this._reader.AdvanceTo
   (this._buffer.GetPosition(this._bytesConsumed));

// top up the buffer stream
ReadResult readResult = await this._reader
    .ReadAtLeastAsync(this._bufferSize, cancellationToken)
    .ConfigureAwait(false);

// reset to new stream buffer segment
this._bytesConsumed = 0;
this._buffer = readResult.Buffer;
this._endOfStream = readResult.IsCompleted;

// check for any issues
if (this._buffer.Length - this._bytesConsumed > 0 && 
    !this.JsonReader(this._endOfStream))
    throw new Exception("Invalid Json or incomplete token or buffer undersized");

此代码位于方法 ValueTask<bool> ReadAsync 中。当我们准备反序列化已识别的对象时,我们将字节缓冲到 MemoryStream 中。我们在 ReadAsync 方法中使用标志 '_isBuffering' 来管理缓冲

// store stream buffer/chunk if we are wanting to Deserialize the Json object
if (this._isBuffering)
{
    this.WriteToBufferStream();

    // reset the buffer start tracking
    this._bufferingStartIndex = 0;
}

缓冲区的写入是通过 PipeWriter 完成的。经过一些测试,由于装箱要求,手动写入比使用内置的 Write 更快。它看起来像这样

this._writer!.Write(this._buffer.Slice(this._bufferingStartIndex, 
                    this._bytesConsumed - this._bufferingStartIndex).ToArray());

我没有继承 PipeWriter 并编写我自己的自定义 Write 方法,因为它只在一个地方需要,我只是内联执行它(没有装箱)

private void WriteToBufferStream()
{
    // get number of bytes to transfer
    int bytes = this._bytesConsumed - this._bufferingStartIndex;

    // store
    this._buffer.Slice(this._bufferingStartIndex, bytes).CopyTo
                      (this._writer!.GetSpan(bytes));

    // manually advance buffer pointer
    this._writer.Advance(bytes);
}

对于 DeserializeAsync 方法,我们需要遍历流以找到对象的末尾。如果我们不这样做,JsonSerializer.DeserializeAsync 方法将抛出错误。遍历流只是监视图的深度,直到我们找到对象或数组的末尾

// walk the json object tree until we have the complete json object
while (!cancellationToken.IsCancellationRequested)
{
    if (this.TokenType is JsonTokenType.StartObject or JsonTokenType.StartArray)
        depth++;
    else if (this.TokenType is JsonTokenType.EndObject or JsonTokenType.EndArray)
        depth--;

    if (depth == 0)
        break;

    await this.ReadAsync(cancellationToken).ConfigureAwait(false);
}

一旦我们拥有完整的对象图,我们就可以进行清理

// remaining bytes to be buffered (overflow)
this.WriteToBufferStream();

// flush all writes
await this._writer!.CompleteAsync().ConfigureAwait(false);

// operation cancelled remotely
if (cancellationToken.IsCancellationRequested)
    return false;

// Point to beginning of the memory stream
stream.Seek(0, SeekOrigin.Begin);

// success
return true;

我们将流指针移回流的开始位置,准备使用 JsonSerializer.DeserializeAsync 方法进行反序列化

// fill temp stream with json object
if (!await this.GetJsonObjectAsync(stream, cancellationToken).ConfigureAwait(false))
    return default;

// deserialize object from temp stream
TResult? result = await JsonSerializer
    .DeserializeAsync<TResult>(stream, cancellationToken: cancellationToken)
    .ConfigureAwait(false);

// we are done buffering
this._isBuffering = false;

查看示例以了解代码及其工作原理。

第 3 部分:简化大型 JSON 对象流工作的库

下一节是关于使用本文和可下载代码中捆绑的库。这些库基于我曾经从事的项目以及从 Newtonsoft.Json 迁移到 System.Text.Json 的需求。这些库不是处理流所必需的,但将有助于封装过程以减少重复代码。

关键设计目标

  • 可在 Newtonsoft.JsonSystem.Text.Json 之间互换;文件系统Web APIJson 对象zipped JSON 对象;具有几乎相同的接口,因此在实现之间切换是一个无缝过程
  • 使用 文件系统Web Api
  • 使用单个或多个原始 JSON 文件或任意大小的压缩原始 Json 文件
  • 抽象出所有打开和处理 JSON 对象的实现 - 只需识别和处理代码
  • 异步操作,包括快速失败错误处理
  • 在流中途取消支持 - CancellationToken
  • 最小内存占用 - 在对象反序列化时处理它们,无需将所有对象保留在内存中
  • 高性能 - 尽可能接近原始性能
  • DI/IOC 支持 - 不依赖于任何 IOC 容器系统
  • ILogger 支持 - 不专门绑定到任何特定的记录器
  • 自定义数据缓冲大小配置 - Newtonsoft 使用的默认缓冲区大小为 1K(1,024 字节),System.Text.Json 为 16K(16,384 字节)
  • 可测试性和基准测试

如何使用:Newtonsoft.Json - 简单 Json 集合对象

对于简单 Json 集合对象,我们实现 JsonFileStreamObjectDeserializer<TConfiguration> 基类

public class ContactFileStreamDeserializer
    : JsonFileStreamObjectDeserializer<IContactFilesConfiguration>
{
 // code goes here
}
Public Class ContactFileStreamDeserializer
    Inherits JsonFileStreamObjectDeserializer(Of IFilesConfiguration)

    ' code goes here...

End Class

然后我们实现 ProcessAsync 方法

protected override async Task ProcessAsync
(JsonReader jsonReader, CancellationToken cancellationToken)
{
    if (this.BatchSize > 1)
        await this.DeserializeAsync<Contact>
        (jsonReader, this.BatchProcessAsync, cancellationToken)
            .ConfigureAwait(false);
    else
        await this.DeserializeAsync<Contact>
        (jsonReader, this.ItemProcessAsync, cancellationToken)
            .ConfigureAwait(false);
}
Protected Overrides Async Function ProcessAsync
   (jsonReader As JsonReader, cancellationToken As CancellationToken) As Task

    If BatchSize > 1 Then
        Await DeserializeAsync(Of Contact)
        (jsonReader, AddressOf BatchProcessAsync, cancellationToken)
    Else
        Await DeserializeAsync(Of Contact)
        (jsonReader, AddressOf ItemProcessAsync, cancellationToken)
    End If

End Function

如您所见,批处理和单个对象支持都内置其中。有关如何配置的更多信息,请参阅下面的属性表。

上述 ProcessAsync 方法代码适用于原始文件和压缩文件。实现基于继承的基类

  1. 原始 Json 文件JsonFileStreamObjectDeserializer<TConfiguration>
  2. 压缩 Json 文件JsonZipFileStreamPropertyDeserializer<TZipConfiguration, TConfiguration>

这同样适用于 Web API

  1. 原始 Json 文件JsonHttpStreamPropertyDeserializer<TConfiguration>
  2. 压缩 Json 文件JsonZipHttpStreamObjectDeserializer<TZipConfiguration, TConfiguration>

使用上述已实现的类

IFilePathHelper fileHelper = new FilePathHelper(DataFolder)
IContactFilesConfiguration config = new ContactFilesConfiguration();
CancellationTokenSource cts = new();

var deserializer = new ContactFileStreamDeserializer(fileHelper, config)
{
    FileId = "MOCK1",
    FileAction = DeserializeActionType.Single,
    FailIfFileNotFound = true,
    CancellationTokenSource = cts
};

await deserializer.ProcessAsync().ConfigureAwait(false);
Dim fileHelper = New FilePathHelper(DataFolder)
Dim config = New ContactFilesConfiguration()
Dim cts As New CancellationTokenSource()

Dim deserializer = New ContactFileStreamDeserializer(fileHelper, config) With
{
    .FileId = "MOCK1",
    .FileAction = DeserializeActionType.Single,
    .FailIfFileNotFound = True,
    .CancellationTokenSource = cts
}

await deserializer.ProcessAsync().ConfigureAwait(False)

有一个配置文件,其中包含数据的位置。此配置类可以包含项目中所有原始 JSON 文件的名称,通过键 ID 访问,或者只包含需要处理的文件

public class ContactFilesConfiguration : IContactFilesConfiguration
{
    #region Constructors

    public ContactFilesConfiguration()
    {
        this.Paths = new Dictionary<string, string>
        {
            ["MOCK1"] = "Mock_Contacts1.json",
            ["MOCK2"] = "Mock_Contacts2.json",
            ["MOCK3"] = "Mock_Contacts3.json",
        };
    }

    #endregion

    #region Properties
    
    public IDictionary<string, string> Paths { get; }

    #endregion
}
Public Class ContactFilesConfiguration
    Implements IContactFilesConfiguration

#Region "Constructors"

    Public Sub New()

        Paths = New Dictionary(Of String, String) From {
                {"MOCK1", "Mock_Contacts1.json"},
                {"MOCK2", "Mock_Contacts2.json"},
                {"MOCK3", "Mock_Contacts3.json"}
            }

    End Sub

#End Region

#Region "Properties"

    Public ReadOnly Property Paths As IDictionary(Of String, String) _
        Implements Configuration.IDataConfiguration.Paths

#End Region

End Class

如果您有多个相同类型的文件要处理,则可以将 FileAction 属性设置为 DeserializeActionType.Multiple,基类将自动遍历配置文件中的文件。

处理压缩的原始 JSON 文件也是如此。我们有一个单独的配置文件,类似于上面的文件

public class ContactZipFilesConfiguration : IContactZipFilesConfiguration
{
    #region Constructors

    public ContactZipFilesConfiguration()
    {
        this.Paths = new Dictionary<string, string>
        {
            ["MOCK_ZIP"] = "Mock_Json_Files.zip",
        };
    }

    #endregion

    #region Properties
    
    public IDictionary<string, string> Paths { get; }

    #endregion
}
Public Class ContactZipFilesConfiguration
    Implements IContactZipFilesConfiguration

#Region "Constructors"

    Public Sub New()

        Paths = New Dictionary(Of String, String) From {
                {"MOCK_ZIP", "Mock_Json_Files.zip"}
            }

    End Sub

#End Region

#Region "Properties"

    Public ReadOnly Property Paths As IDictionary(Of String, String) _
        Implements Configuration.IDataConfiguration.Paths

#End Region

End Class

注意:要查看代码运行,请参阅 applications \ local \ SimpleData \ NewtonsoftContactsapplications \ local \ SimpleZippedData \ NewtonsoftZippedContactsapplications \ remote \ SimpleData \ NewtonsoftContactsapplications \ remote \ SimpleZippedData \ NewtonsoftZippedContacts VB/C# 项目,用于非依赖注入。要与依赖注入一起使用,请参阅 applications \ local \ SimpleData \ NewtonsoftContactsDIapplications \ local \ SimpleZippedData \ NewtonsoftZippedContactsDIapplications \ remote \ SimpleData \ NewtonsoftContactsDIapplications \ remote \ SimpleZippedData \ NewtonsoftZippedContactsDI VB/C# 项目。非 DI 和 DI 项目之间的通用代码位于 Shared 子文件夹中。您还可以在 UnitTest 和 Benchmark VB/C# 项目中查看正在使用的代码。

如何使用:Newtonsoft.Json - 复杂 JSON 对象

对于复杂的 Json 对象,我们需要实现 JsonFileStreamPropertyDeserializer<TConfiguration> 基类

public class EbayCategoryAspectFileStreamDeserializer
    : JsonFileStreamPropertyDeserializer<IEbayCategoryAspectFilesConfiguration>
{
 // code goes here
}
Public Class EbayCategoryAspectFileStreamDeserializer
    Inherits JsonFileStreamObjectDeserializer(Of IEbayCategoryAspectFilesConfiguration)

    '  code goes here

End Class

然后我们实现 ProcessAsync 方法

protected override async Task ProcessAsync
(JsonReader jsonReader, CancellationToken cancellationToken)
{
    // process properties for data that we want
    switch (jsonReader.GetString())
    {
        case "categoryAspects":
            if (BatchSize > 1)
                await DeserializeAsync<CategoryAspect>
                (jsonReader, BatchProcessAsync, cancellationToken)
                    .ConfigureAwait(false);
            else
                await DeserializeAsync<CategoryAspect>
                (jsonReader, ItemProcessAsync, cancellationToken)
                    .ConfigureAwait(false);
            break;
        case "categoryTreeVersion":
        {
            // get the value
            await jsonReader.ReadAsync(cancellationToken).ConfigureAwait(false);

            string? version = jsonReader.GetString();
            Logger?.Emit(LogLevel.Information, $"Version: {version ?? "no value"}");
            break;
        }

        case "categoryTreeId":
        {
            // get the value
            await jsonReader.ReadAsync(cancellationToken).ConfigureAwait(false);

            string? id = jsonReader.GetString();
            Logger?.Emit(LogLevel.Information, $"Id: {id ?? "no value"}");
            break;
        }
    }
}
Protected Overrides Async Function ProcessAsync
  (jsonReader As JsonReader, cancellationToken As CancellationToken) As Task

    ' process properties for data that we want
    Select Case jsonReader.GetString()

        Case "categoryAspects"
            Await jsonReader.ReadAsync().ConfigureAwait(False)

            If BatchSize > 1 Then
                Await DeserializeAsync(Of CategoryAspect)(jsonReader,
                    AddressOf BatchProcessAsync, cancellationToken).
                    ConfigureAwait(False)
            Else
                Await DeserializeAsync(Of CategoryAspect)(jsonReader, 
                    AddressOf ItemProcessAsync, cancellationToken).
                    ConfigureAwait(False)
            End If

        Case "categoryTreeVersion"

            ' get the value
            Await jsonReader.ReadAsync().ConfigureAwait(False)

            Dim version = jsonReader.GetString()
            _logger.Emit(LogLevel.Information, $"Version: {If(version, "no value")}")

        Case "categoryTreeId"

            ' get the value
            Await jsonReader.ReadAsync().ConfigureAwait(False)

            Dim Id = jsonReader.GetString()
            _logger.Emit(LogLevel.Information, $"Id: {If(Id, "no value")}")

    End Select

End Function

使用上述已实现的类

IFilePathHelper fileHelper = new FilePathHelper(DataFolder)
IEbayCategoryAspectFilesConfiguration config = 
                   new EbayCategoryAspectFilesConfiguration();
CancellationTokenSource cts = new();

var deserializer = new EbayCategoryAspectFileStreamDeserializer(fileHelper, config)
{
    MarketplaceId = "EBAY_US",
    FileAction = DeserializeActionType.Single,
    FailIfFileNotFound = true,
    CancellationTokenSource = cts
};

await deserializer.ProcessAsync().ConfigureAwait(false);
Dim fileHelper = New FilePathHelper(DataFolder)
Dim config = New ContactFilesConfiguration()
Dim cts As New CancellationTokenSource()

Dim deserializer = 
    New EbayCategoryAspectFileStreamDeserializer(fileHelper, config) With
{
    .MarketplaceId = "EBAY_US",
    .FileAction = DeserializeActionType.Single,
    .FailIfFileNotFound = True,
    .CancellationTokenSource = cts
}

await deserializer.ProcessAsync().ConfigureAwait(False)

注意:要查看代码运行,请参阅 applications \ local \ SimpleData \ NewtonsoftEbayapplications \ local \ SimpleZippedData \ NewtonsoftZippedEbayapplications \ remote \ SimpleData \ NewtonsoftEbayapplications \ remote \ SimpleZippedData \ NewtonsoftZippedEbay VB/C# 项目,用于非依赖注入。要与依赖注入一起使用,请参阅 applications \ local \ SimpleData \ NewtonsoftEbayDIapplications \ local \ SimpleZippedData \ NewtonsoftZippedEbayDIapplications \ remote \ SimpleData \ NewtonsoftEbayDIapplications \ remote \ SimpleZippedData \ NewtonsoftZippedEbayDI VB/C# 项目。非 DI 和 DI 项目之间的通用代码位于 Shared 子文件夹中。您还可以在 UnitTest 和 Benchmark VB/C# 项目中查看正在使用的代码。

如何使用:System.Text.Json - 简单 Json 集合对象

对于简单 JSON 集合对象,System.Text.Json 有一个用于枚举 JSON 对象流集合的新反序列化方法,称为 DeserializeAsyncEnumerable。因此,基类实现公开了一个 Stream 对象而不是 StreamReader 对象。JsonFileStreamObjectDeserializer<TConfiguration> 基类的实现是相同的

public class ContactFileStreamDeserializer
    : JsonFileStreamObjectDeserializer<IContactFilesConfiguration>
{
 // code goes here
}
Public Class ContactFileStreamDeserializer
    Inherits JsonFileStreamObjectDeserializer(Of IFilesConfiguration)

    ' code goes here...

End Class

然而,我们如何实现 ProcessAsync 方法有所改变

protected override async Task ProcessAsync
          (Stream stream, CancellationToken cancellationToken)
{
    if (this.BatchSize > 1)
        await this.DeserializeAsync<Contact>
        (stream, this.BatchProcessAsync, cancellationToken)
            .ConfigureAwait(false);
    else
        await this.DeserializeAsync<Contact>
        (stream, this.ItemProcessAsync, cancellationToken)
            .ConfigureAwait(false);
}
Protected Overrides Async Function ProcessAsync
          (stream As Stream, cancellationToken As CancellationToken) As Task

    If BatchSize > 1 Then
        Await DeserializeAsync(Of Contact)(stream,
            AddressOf BatchProcessAsync, cancellationToken).
            ConfigureAwait(False)
    Else
        Await DeserializeAsync(Of Contact)(stream,
            AddressOf ItemProcessAsync, cancellationToken).
            ConfigureAwait(False)
    End If

End Function

上述实现的用法与 Newtonsoft.Json 相同

IFilePathHelper fileHelper = new FilePathHelper(DataFolder)
IContactFilesConfiguration config = new ContactFilesConfiguration();
CancellationTokenSource cts = new();

var deserializer = new ContactFileStreamDeserializer(filePathHelper, config)
{
    FileId = "MOCK1",
    FileAction = DeserializeActionType.Single,
    FailIfFileNotFound = true,
    CancellationTokenSource = cancellationTokenSource
};

await deserializer.ProcessAsync().ConfigureAwait(false);
Dim fileHelper = New FilePathHelper(DataFolder)
Dim config = New ContactFilesConfiguration()
Dim cts As New CancellationTokenSource()

Dim deserializer = New ContactFileStreamDeserializer(fileHelper, config) With
{
    .FileId = "MOCK1",
    .FileAction = DeserializeActionType.Single,
    .FailIfFileNotFound = True,
    .CancellationTokenSource = cts
}

await deserializer.ProcessAsync().ConfigureAwait(False)

注意:要查看代码运行,请参阅 applications \ local \ SimpleData \ SystemTextJsonContactsapplications \ local \ SimpleZippedData \ SystemTextJsonZippedContactsapplications \ remote \ SimpleData \ SystemTextJsonContactsapplications \ remote \ SimpleZippedData \ SystemTextJsonZippedContacts VB/C# 项目,用于非依赖注入。要与依赖注入一起使用,请参阅 applications \ local \ SimpleData \ SystemTextJsonContactsDIapplications \ local \ SimpleZippedData \ SystemTextJsonZippedContactsDIapplications \ remote \ SimpleData \ SystemTextJsonContactsDIapplications \ remote \ SimpleZippedData \ SystemTextJsonZippedContactsDI VB/C# 项目。非 DI 和 DI 项目之间的通用代码位于 *Shared* 子文件夹中。您还可以在 UnitTestBenchmark VB/C# 项目中查看正在使用的代码。

如何使用:System.Text.Json - 复杂 JSON 对象

对于复杂的 JSON 对象,我们使用自定义 Utf8JsonAsyncStreamReader 类进行流读取和处理。JsonFileStreamPropertyDeserializer<TConfiguration> 基类的实现

public class EbayCategoryAspectFileStreamDeserializer
    : JsonFileStreamPropertyDeserializer<IEbayCategoryAspectFilesConfiguration>
{
 // code goes here
}
Public Class EbayCategoryAspectFileStreamDeserializer
    Inherits JsonFileStreamObjectDeserializer(Of IEbayCategoryAspectFilesConfiguration)

    ' code goes here...

End Class

然后我们实现 ProcessAsync 方法

protected override async Task ProcessAsync
(Utf8JsonAsyncStreamReader jsonReader, CancellationToken cancellationToken)
{
    // process properties for data that we want
    switch (jsonReader.GetString())
    {
        case "categoryAspects":
            if (BatchSize > 1)
                await DeserializeAsync<CategoryAspect>
                      (jsonReader, BatchProcessAsync, cancellationToken)
                    .ConfigureAwait(false);
            else
                await DeserializeAsync<CategoryAspect>
                      (jsonReader, ItemProcessAsync, cancellationToken)
                    .ConfigureAwait(false);
            break;
        case "categoryTreeVersion":
        {
            // get the value
            await jsonReader.ReadAsync(cancellationToken).ConfigureAwait(false);

            string? version = jsonReader.GetString();
            Logger?.Emit(LogLevel.Information, $"Version: {version ?? "no value"}");
            break;
        }

        case "categoryTreeId":
        {
            // get the value
            await jsonReader.ReadAsync(cancellationToken).ConfigureAwait(false);

            string? id = jsonReader.GetString();
            Logger?.Emit(LogLevel.Information, $"Id: {id ?? "no value"}");
            break;
        }
    }
}
Protected Overrides Async Function ProcessAsync
(jsonReader As Utf8JsonAsyncStreamReader, 
 cancellationToken As CancellationToken) As Task

    ' process properties for data that we want
    Select Case jsonReader.GetString()

        Case "categoryAspects"
            Await jsonReader.ReadAsync().ConfigureAwait(False)

            If BatchSize > 1 Then
                Await DeserializeAsync(Of CategoryAspect)(jsonReader,
                    AddressOf BatchProcessAsync, cancellationToken).
                    ConfigureAwait(false)
            Else
                Await DeserializeAsync(Of CategoryAspect)(jsonReader,
                    AddressOf ItemProcessAsync, cancellationToken).
                    ConfigureAwait(false)
            End If

        Case "categoryTreeVersion"

            ' get the value
            Await jsonReader.ReadAsync().ConfigureAwait(False)

            Dim version = jsonReader.GetString()
            _logger.Emit(LogLevel.Information, $"Version: {If(version, "no value")}")

        Case "categoryTreeId"

            ' get the value
            Await jsonReader.ReadAsync().ConfigureAwait(False)

            Dim Id = jsonReader.GetString()
            _logger.Emit(LogLevel.Information, $"Id: {If(Id, "no value")}")

    End Select

End Function

要使用上述已实现的类,它与 NewtonSoft 相同

IFilePathHelper fileHelper = new FilePathHelper(DataFolder)
IEbayCategoryAspectFilesConfiguration config = 
                        new EbayCategoryAspectFilesConfiguration();
CancellationTokenSource cts = new();

var deserializer = new EbayCategoryAspectFileStreamDeserializer(fileHelper, config)
{
    MarketplaceId = "EBAY_US",
    FileAction = DeserializeActionType.Single,
    FailIfFileNotFound = true,
    CancellationTokenSource = cts
};

await deserializer.ProcessAsync().ConfigureAwait(false);
Dim fileHelper = New FilePathHelper(DataFolder)
Dim config = New ContactFilesConfiguration()
Dim cts As New CancellationTokenSource()

Dim deserializer = New EbayCategoryAspectFileStreamDeserializer(fileHelper, config) With
{
    .MarketplaceId = "EBAY_US",
    .FileAction = DeserializeActionType.Single,
    .FailIfFileNotFound = True,
    .CancellationTokenSource = cts
}

await deserializer.ProcessAsync().ConfigureAwait(False)

注意:要查看代码运行,请参阅 applications \ local \ SimpleData \ SystemTextJsonEbayapplications \ local \ SimpleZippedData \ SystemTextJsonZippedEbayapplications \ remote \ SimpleData \ SystemTextJsonEbayapplications \ remote \ SimpleZippedData \ SystemTextJsonZippedEbay VB/C# 项目,用于非依赖注入。要与依赖注入一起使用,请参阅 applications \ local \ SimpleData \ SystemTextJsonEbayDIapplications \ local \ SimpleZippedData \ SystemTextJsonZippedEbayDIapplications \ remote \ SimpleData \ SystemTextJsonEbayDIapplications \ remote \ SimpleZippedData \ SystemTextJsonZippedEbayDI VB/C# 项目。非 DI 和 DI 项目之间的通用代码位于 Shared 子文件夹中。您还可以在 UnitTest 和 Benchmark VB/C# 项目中查看正在使用的代码。

库实现

基类的设计旨在同时适用于 Newtonsoft.JsonSystem.Text.Json,因此基类分为三个部分

  1. 通用基础实现:Common.Json > JsonStreamDeserializer
  2. Newtonsoft.Json 基础实现:Common.NewtonSoft.Json > JsonStreamPropertyDeserializerJsonZipStreamPropertyDeserializer 基础公共类。然后有用于 文件系统Web API 实现的单独基类
    • 文件系统
      • 简单:JsonFileStreamPropertyDeserializerJsonZipFileStreamPropertyDeserializer
      • 复杂:JsonFileStreamObjectDeserializerJsonZipFileStreamObjectDeserializer
    • Web API
      • 简单:JsonHttpStreamPropertyDeserializerJsonZipHttpStreamPropertyDeserializer
      • 复杂:JsonHttpStreamObjectDeserializerJsonZipHttpStreamObjectDeserializer
  3. System.Text.Json 基础实现:Common.System.Text.Json > JsonStreamPropertyDeserializerJsonZipStreamPropertyDeserializerJsonStreamObjectDeserializerJsonZipStreamObjectDeserializer 基础公共类。然后有用于 文件系统Web API 实现的单独基类
    • 文件系统
      • 简单:JsonFileStreamPropertyDeserializerJsonZipFileStreamPropertyDeserializer
      • 复杂:JsonFileStreamObjectDeserializerJsonZipFileStreamObjectDeserializer
    • Web API
      • 简单:JsonHttpStreamPropertyDeserializerJsonZipHttpStreamPropertyDeserializer
      • 复杂:JsonHttpStreamObjectDeserializerJsonZipHttpStreamObjectDeserializer

System.Text.Json 有两个额外的 Object 公共基类。这是由于 Newtonsoft.JsonSystem.Text.Json 之间的差异。

如果您将库用于自己的用途,则所需的项目如下

  1. Newtonsoft.JsonCommon.JsonCommon.NewtonSoft.Json
  2. System.Text.JsonCommon.JsonCommon.System.Text.Json (对于 VB 为 Common.SystemText.Json,因为编译器名称冲突)

配置属性

属性 描述 默认值
文件 ID 配置文件中的查找键 未设置
ZipFileId 配置 zip 文件中的查找键 未设置
文件操作 单个或多个配置文件条目 Single
ZipFileAction 配置 zip 文件中的单个或多个查找键 Single
BatchSize 一次处理的对象数量 1
缓冲区大小 从流中读取和处理的字节数 8,192
找不到文件时失败 找不到文件时静默失败或抛出异常 true
CancellationTokenSource (可选) 默认
JsonSerializerSettings 仅限 Newtonsoft 默认
JsonSerializerOptions 仅限 System.Text.Json 默认

我不会讨论这些类的代码,因为代码很多,而且本文已经太长了。所以,我建议查看代码。

我要指出的是我如何处理选择处理方法的决策逻辑。

我正在使用基于 FileAction 的键控字典

Common 基础

// Generate an ActionDelegate key
protected string GetActionKey(DeserializeActionType fileType)
        => $"{fileType}";

protected abstract Dictionary<string, Func<Task>> ActionDelegatesFactory();

// Execute an ActionDelegate based on configuration settings
protected virtual async ValueTask ExecuteActionDelegateAsync(string key)
{
    Dictionary<string, Func<Task>> ActionDelegates = this.ActionDelegatesFactory();

    if (!ActionDelegates.ContainsKey(key))
    {
        KeyNotFoundException exception = new($"The '{this.FileAction}' 
                                               Action was not found!");
        this.Logger?.Emit(LogLevel.Error, "Invalid Action!", exception);

        throw exception;
    }

    await ActionDelegates[key]().ConfigureAwait(false);
}
' Generate an ActionDelegate key
Protected Function GetActionKey(fileType As DeserializeActionType) As String

    Return $"{fileType}"

End Function

' abstract members/variables are not allowed
Protected MustOverride Function ActionDelegatesFactory() _
                       As Dictionary(Of String, Func(Of Task))

' Execute an ActionDelegate based on configuration settings
Protected Overridable Async Function _
          ExecuteActionDelegateAsync(key As String) As Task

    Dim ActionDelegates = ActionDelegatesFactory()

    If Not ActionDelegates.ContainsKey(key) Then
        Dim exception = New KeyNotFoundException_
                        ($"The '{Me.FileAction}' Action was not found!")
        Me._logger.Emit(LogLevel.Error, "Invalid Action!", exception)

        Throw exception
    End If

    Await ActionDelegates(key)().ConfigureAwait(False)

End Function

Common.< lib > 文件/web API 基础

// Main Entry Point
public override async ValueTask ProcessAsync()
    => await this.ExecuteActionDelegateAsync(this.GetActionKey(this.FileAction))
        .ConfigureAwait(false);

// Handler for parent class to process json fragments
protected abstract Task ProcessAsync(JsonReader jsonReader, 
                                     CancellationToken cancellationToken);

// Map configuration settings to Actions
protected override Dictionary<string, Func<Task>> ActionDelegatesFactory()
    => new()
    {
        [this.GetActionKey(DeserializeActionType.Single)] = ()
            => this.ProcessActionAsync
            (this._configuration!.Paths[this.ConfigurationFileKey!], 
             this.ProcessAsync,
             this.CancellationTokenSource?.Token ?? default),

        [this.GetActionKey(DeserializeActionType.Multiple)] = ()
            => this.ProcessActionAsync
            (this._configuration!, this.ProcessAsync, 
             this.CancellationTokenSource?.Token ?? default),
    };
' Main Entry Point
Public Overrides Async Function ProcessAsync() As Task

    Await Me.ExecuteActionDelegateAsync_
             (Me.GetActionKey(Me.FileAction)).ConfigureAwait(False)

End Function

' Handler for parent class
Protected MustOverride Overloads Function ProcessAsync_
   (jsonReader As JsonReader, cancellationToken As CancellationToken) As Task

' Map configuration settings to Actions
Protected Overrides Function ActionDelegatesFactory() _
                    As Dictionary(Of String, Func(Of Task))

    Return New Dictionary(Of String, Func(Of Task)) From {
        {Me.GetActionKey(DeserializeActionType.Single), _
                         AddressOf Me.SingleShimAsync},
        {Me.GetActionKey(DeserializeActionType.Multiple), _
                         AddressOf Me.MultipleShimAsync}
    }

End Function

#Region "ActionDelegatesFactory Shims"

Private Async Function SingleShimAsync() As Task

    Await Me.ProcessActionAsync(
            Me._configuration.Paths(Me.ConfigurationFileKey),
            AddressOf Me.ProcessAsync,
            If(Me.CancellationTokenSource Is Nothing, _
               Nothing, Me.CancellationTokenSource.Token)) _
            .ConfigureAwait(False)

End Function

Private Async Function MultipleShimAsync() As Task

    Await Me.ProcessActionAsync(
            Me._configuration,
            AddressOf Me.ProcessAsync,
            If(Me.CancellationTokenSource Is Nothing, _
               Nothing, Me.CancellationTokenSource.Token)) _
            .ConfigureAwait(False)

End Function

#End Region

Common.< lib > 压缩文件/web API 基础

// Main Entry Point
public override async ValueTask ProcessAsync()
    => await this.ExecuteActionDelegateAsync
       (this.GetActionKey(this.ZipFileAction, this.FileAction))
        .ConfigureAwait(false);

#region Processors

// Generate an ActionDelegate key
private string GetActionKey(DeserializeActionType zipFileType, 
                            DeserializeActionType fileType)
    => $"{zipFileType}{this.GetActionKey(fileType)}";

// Execute an ActionDelegate based on configuration settings
protected override async ValueTask ExecuteActionDelegateAsync(string key)
{
    Dictionary<string, Func<Task>> ActionDelegates = this.ActionDelegatesFactory();

    if (!ActionDelegates.ContainsKey(key))
    {
        KeyNotFoundException exception =
            new KeyNotFoundException
            ($"The zip '{this.ZipFileAction} ' or file '{this.FileAction}' 
            Action(s) not found!");
        this.Logger?.Emit(LogLevel.Error, "Invalid Action!", exception);

        throw exception;
    }

    await ActionDelegates[key]().ConfigureAwait(false);
}

// Map configuration settings to Actions
protected override Dictionary<string, Func<Task>> ActionDelegatesFactory()
    => new()
    {
        [this.GetActionKey(DeserializeActionType.Single, 
         DeserializeActionType.Single)] = ()
            => this.ProcessZipActionAsync
                (this._zipConfiguration.Paths[this.ConfigurationZipFileKey!],
                 this._configuration!.Paths[this.ConfigurationFileKey!], 
                 this.ProcessAsync,
                 this.CancellationTokenSource?.Token ?? default),

        [this.GetActionKey(DeserializeActionType.Multiple, 
         DeserializeActionType.Single)] = ()
            => this.ProcessZipActionAsync
                (this._zipConfiguration, 
                 this._configuration!.Paths[this.ConfigurationFileKey!],
                 this.ProcessAsync, this.CancellationTokenSource?.Token ?? default),

        [this.GetActionKey(DeserializeActionType.Single, 
         DeserializeActionType.Multiple)] = ()
            => ProcessZipActionAsync
                (this._zipConfiguration.Paths[this.ConfigurationZipFileKey!],
                 this._configuration!, this.ProcessAsync,
                 this.CancellationTokenSource?.Token ?? default),

        [this.GetActionKey(DeserializeActionType.Multiple, 
                           DeserializeActionType.Multiple)] = ()
            => this.ProcessZipActionAsync
                (this._zipConfiguration, this.ProcessAsync,
                 this.CancellationTokenSource?.Token ?? default),
    };
' Main Entry Point
Public Overrides Async Function ProcessAsync() As Task

    Await Me.ExecuteActionDelegateAsync(Me.GetActionKey_
             (Me.ZipFileAction, Me.FileAction)).ConfigureAwait(False)

End Function

' Generate an ActionDelegate key
Private Shadows Function GetActionKey(zipFileType As DeserializeActionType, _
                                      fileType As DeserializeActionType) As String

	Return $"{zipFileType}{MyBase.GetActionKey(fileType)}"

End Function

' Execute an ActionDelegate based on configuration settings
Protected Overrides Async Function ExecuteActionDelegateAsync(key As String) As Task

	Dim ActionDelegates = ActionDelegatesFactory()

	If Not ActionDelegates.ContainsKey(key) Then
		Dim exception = New KeyNotFoundException($"The zip '{Me.ZipFileAction} ' _
                        or file '{Me.FileAction}' Action(s) not found!")
		Me._logger.Emit(LogLevel.Error, "Invalid Action!", exception)

		Throw exception
	End If

	Await ActionDelegates(key)().ConfigureAwait(False)

End Function

' Map configuration settings to Actions
Protected Overrides Function ActionDelegatesFactory() _
          As Dictionary(Of String, Func(Of Task))

	Return New Dictionary(Of String, Func(Of Task)) From {
			{Me.GetActionKey(DeserializeActionType.Single, _
             DeserializeActionType.Single),
			 AddressOf Me.SingleSingleShimAsync},
			{Me.GetActionKey(DeserializeActionType.Single, _
             DeserializeActionType.Multiple),
			 AddressOf Me.SingleMultipleShimAsync},
			{Me.GetActionKey(DeserializeActionType.Multiple, _
             DeserializeActionType.Single),
			 AddressOf Me.MultipleSingleShimAsync},
			{Me.GetActionKey(DeserializeActionType.Multiple, _
             DeserializeActionType.Multiple),
			 AddressOf Me.MultipleMultipleShimAsync}
		}

End Function

#Region "ActionDelegatesFactory Shims"

' NOTE: VB does not support inline delegates 
' with parameters in Dictionaries like C#, only method references, 
' so shims are used to wrap method calls with parameters
Private Async Function SingleSingleShimAsync() As Task

	Await Me.ProcessZipActionAsync(
		Me._zipConfiguration.Paths(Me.ConfigurationZipFileKey),
		Me._configuration.Paths(Me.ConfigurationFileKey),
		AddressOf Me.ProcessAsync,
		If(Me.CancellationTokenSource Is Nothing, Nothing, _
           Me.CancellationTokenSource.Token))
		ConfigureAwait(False)

End Function

Private Async Function MultipleSingleShimAsync() As Task

	Await Me.ProcessZipActionAsync(
		Me._zipConfiguration,
		Me._configuration.Paths(Me.ConfigurationFileKey),
		AddressOf Me.ProcessAsync,
		If(Me.CancellationTokenSource Is Nothing, Nothing, _
           Me.CancellationTokenSource.Token)).
		ConfigureAwait(False)

End Function

Private Async Function SingleMultipleShimAsync() As Task

	Await Me.ProcessZipActionAsync(
		Me._zipConfiguration.Paths(Me.ConfigurationZipFileKey),
		Me._configuration,
		AddressOf Me.ProcessAsync,
		If(Me.CancellationTokenSource Is Nothing, Nothing, _
           Me.CancellationTokenSource.Token)).
		ConfigureAwait(False)

End Function

Private Async Function MultipleMultipleShimAsync() As Task

	Await Me.ProcessZipActionAsync(
		Me._zipConfiguration,
		AddressOf Me.ProcessAsync,
		If(Me.CancellationTokenSource Is Nothing, Nothing, _
           Me.CancellationTokenSource.Token)).
		ConfigureAwait(False)

End Function

#End Region

注意:VB 和 C# 处理委托的方式不同。对于 VB,需要 Shim 方法来允许向方法调用传递参数。

第 4 部分:单元测试

单元测试仅针对 文件系统 文件和压缩文件实现,涵盖简单 JSON 集合对象Contacts)和复杂 JSON 对象Ebay CategoryAspect)。单元测试涵盖

  • 标准调用完成
  • 具有模拟取消的标准调用
  • 无效配置 - 未找到密钥和未找到文件
  • 单文件和多文件处理

还有针对以下内容的单元测试

  • 自定义 Utf8JsonAsyncStreamReader
  • FilePathHelper 类以及 FileConfiguration

所有测试均使用依赖注入完成。

以下扩展旨在实现 Simulated Cancellation

public static class TaskExtensions
{
    // executing async method wrapper to capture the results, 
    // handle cancellation, and capture any exceptions
    public static Task TaskAwaiter(this ValueTask valueTask, 
    CancellationTokenSource? cancellationTokenSource = default, int delay = 2000)
    {
        //emulate a user cancellation in x milliseconds
        cancellationTokenSource?.CancelAfter(delay);

        // get a reference to the running task
        Task task = valueTask.AsTask();
    
        while (!task.GetAwaiter().IsCompleted && 
               cancellationTokenSource?.IsCancellationRequested != true)
        {
            // waiting ...
        }

        // we need to capture the task & a snapshot of the statuses before returning
        return task;
    }
}
Public Module TaskExtensions

    ' executing async method wrapper to capture the results, 
    ' handle cancellation, and capture any exceptions
    <Extension>
    Public Function TaskAwaiter(task As Task, _
    Optional cancellationTokenSource As CancellationTokenSource = Nothing, _
    Optional delay As Integer = 2000) As Task

        ' emulate a user cancellation in x milliseconds
        cancellationTokenSource?.CancelAfter(delay)

        While Not task.GetAwaiter().IsCompleted AndAlso
              ((cancellationTokenSource Is Nothing) _
               OrElse cancellationTokenSource.IsCancellationRequested <> True)
            'waiting...
        End While

        ' we need to capture the task & a snapshot of the statuses before returning
        Return task

    End Function

End Module

如果传递 delay = 0,或者没有取消令牌,则取消将不会执行。

以下是它的使用方式

private const int SimulatedDelay = 5; // must be less than actual execution time
 
[Fact]
void Live_File_Single_Cancellation()
{
    CancellationTokenSource cts = new();

    Task task = this.Execute(this._liveDeserializer, TestFileKey, 
                             DeserializeActionType.Single, false, cts);

    if (cts.Token.IsCancellationRequested)
        this._logger.Emit(LogLevel.Warning, "Cancellation was requested");

    task.IsCompleted.Should().BeFalse();

    cts.IsCancellationRequested.Should().BeTrue();
    this._liveDeserializer.CancellationTokenSource!
        .IsCancellationRequested.Should().BeTrue();
    this._liveDeserializer.CancellationTokenSource!.Should().Be(cts);
}

private Task Execute
(
    IContactFileStreamDeserializer deserializer,
    string fileKey,
    DeserializeActionType fileAction,
    bool failIfFileNotFound,
    CancellationTokenSource? cts = default,
    int delay = SimulatedDelay
)
{
    deserializer.FileId = fileKey;
    deserializer.FileAction = fileAction;
    deserializer.FailIfFileNotFound = failIfFileNotFound;
    deserializer.CancellationTokenSource = cts;

    // we need to capture the Task
    return deserializer.ProcessAsync().TaskAwaiter(cts, delay);
}
Private Const SimulatedDelay As Integer = 5 ' must be less than actual execution time

<Fact>
Sub Live_File_Single_Cancellation()

    Dim cts = New CancellationTokenSource()

    Dim task As Task = Me.Execute(Me._liveDeserializer, _
             TestFileKey, DeserializeActionType.Single, False, cts)

    If cts.Token.IsCancellationRequested Then
        Me._logger.Emit(LogLevel.Warning, "Cancellation was requested")
    End If

    task.IsCompleted.Should().BeFalse()

    cts.IsCancellationRequested.Should().BeTrue()
    Me._liveDeserializer.CancellationTokenSource._
        IsCancellationRequested.Should().BeTrue()
    Me._liveDeserializer.CancellationTokenSource.Should().Be(cts)

End Sub

Function Execute _
(
        deserializer As IContactFileStreamDeserializer,
        fileKey As String,
        fileAction As DeserializeActionType,
        failIfFileNotFound As Boolean,
        Optional cts As CancellationTokenSource = Nothing,
        Optional delay As Integer = SimulatedDelay
) As Task

    deserializer.FileId = fileKey
    deserializer.FileAction = fileAction
    deserializer.FailIfFileNotFound = failIfFileNotFound
    deserializer.CancellationTokenSource = cts

    ' we need to capture the Task
    Return deserializer.ProcessAsync().TaskAwaiter(cts, delay)

End Function

第 5 部分:基准测试

基准测试针对 C# 和 VB 的文件系统实现,涵盖了简单 JSON 集合对象Contacts)和复杂 JSON 对象Ebay CategoryAspect)的使用。基准测试包括默认的文件/流方法和自定义库流方法

  • <Contact/Ebay>_<NewtonSoft/SystemText>_Default:将整个文件加载到 string 中并反序列化
  • <Contact/Ebay>_<NewtonSoft/SystemText>_DefaultStream:将整个文件加载到流中并反序列化
  • Contact_SystemText_DefaultEnumerableStream:这是一个独特的测试,使用 DeserializeAsyncEnumerable 进行流式传输并一次反序列化一个 JSON 对象
  • <Contact/Ebay>_<NewtonSoft/SystemText>_Streaming:库流式传输并一次反序列化一个 JSON 对象
  • <Contact/Ebay>_<NewtonSoft/SystemText>_StreamingBatch10:库流式传输并一次反序列化 10 个 JSON 对象的批次
  • <Contact/Ebay>_<NewtonSoft/SystemText>_StreamingChunk64K:库流式传输并一次反序列化一个 JSON 对象,使用 64KB 缓冲区
  • <Contact/Ebay>_<NewtonSoft/SystemText>_StreamingBatch10BufferSize64K:库流式传输并一次反序列化 10 个 JSON 对象的批次,使用 64KB 缓冲区

测试数据

Contacts:               500,000 records / 297,675KB
Ebay Category Aspects:      750 records /  68,118KB 

测试机器配置

BenchmarkDotNet=v0.13.2, OS=Windows 11 (10.0.22621.675)
AMD Ryzen 7 3700X, 1 CPU, 16 logical and 8 physical cores
.NET SDK=7.0.100-rc.2.22477.23
  [Host]     : .NET 6.0.10 (6.0.1022.47605), X64 RyuJIT AVX2
  DefaultJob : .NET 6.0.10 (6.0.1022.47605), X64 RyuJIT AVX2
  M.2 SSD 

C# 基准测试结果

|                                           Method |    Mean |    Error |   StdDev | Ratio | Rank |
|------------------------------------------------- |--------:|---------:|---------:|------:|-----:|
| Contact_SystemText_StreamingBatch10BufferSize64K | 2.342 s | 0.0059 s | 0.0055 s |  0.51 |    1 |
|             Contact_SystemText_StreamingChunk64K | 2.415 s | 0.0183 s | 0.0171 s |  0.53 |    2 |
|                     Contact_SystemText_Streaming | 2.472 s | 0.0176 s | 0.0156 s |  0.54 |    3 |
|       Contact_SystemText_DefaultEnumerableStream | 2.480 s | 0.0056 s | 0.0052 s |  0.54 |    3 |
|              Contact_SystemText_StreamingBatch10 | 2.536 s | 0.0049 s | 0.0044 s |  0.56 |    4 |
|                       Contact_SystemText_Default | 4.002 s | 0.0483 s | 0.0452 s |  0.88 |    5 |
|             Contact_NewtonSoft_StreamingChunk64K | 4.451 s | 0.0165 s | 0.0129 s |  0.98 |    6 |
| Contact_NewtonSoft_StreamingBatch10BufferSize64K | 4.484 s | 0.0130 s | 0.0122 s |  0.98 |    6 |
|                     Contact_NewtonSoft_Streaming | 4.556 s | 0.0132 s | 0.0117 s |  1.00 |    7 |
|              Contact_NewtonSoft_StreamingBatch10 | 4.636 s | 0.0908 s | 0.0892 s |  1.02 |    7 |
|                 Contact_NewtonSoft_DefaultStream | 4.729 s | 0.0194 s | 0.0181 s |  1.04 |    8 |
|                       Contact_NewtonSoft_Default | 6.268 s | 0.0385 s | 0.0341 s |  1.38 |    9 |


|                                        Method |       Mean |    Error |   StdDev | Ratio | Rank |
|---------------------------------------------- |-----------:|---------:|---------:|------:|-----:|
|                 Ebay_SystemText_DefaultStream |   729.2 ms |  4.71 ms |  4.18 ms |  0.63 |    1 |
|                       Ebay_SystemText_Default |   970.3 ms |  8.58 ms |  8.02 ms |  0.83 |    2 |
|             Ebay_NewtonSoft_StreamingChunk64K | 1,091.8 ms |  4.40 ms |  3.44 ms |  0.94 |    3 |
| Ebay_NewtonSoft_StreamingBatch10BufferSize64K | 1,094.4 ms |  7.32 ms |  6.11 ms |  0.94 |    3 |
|              Ebay_NewtonSoft_StreamingBatch10 | 1,122.8 ms |  8.79 ms |  7.79 ms |  0.96 |    4 |
|                     Ebay_NewtonSoft_Streaming | 1,164.6 ms |  9.93 ms |  9.29 ms |  1.00 |    5 |
|                 Ebay_NewtonSoft_DefaultStream | 1,248.4 ms | 16.03 ms | 14.99 ms |  1.07 |    6 |
|             Ebay_SystemText_StreamingChunk64K | 1,453.1 ms |  4.81 ms |  4.50 ms |  1.25 |    7 |
|                     Ebay_SystemText_Streaming | 1,534.9 ms |  5.19 ms |  4.86 ms |  1.32 |    8 |
|                       Ebay_NewtonSoft_Default | 1,536.8 ms | 18.24 ms | 17.06 ms |  1.32 |    8 |
| Ebay_SystemText_StreamingBatch10BufferSize64K | 1,562.3 ms |  5.77 ms |  5.40 ms |  1.34 |    9 |
|              Ebay_SystemText_StreamingBatch10 | 1,642.4 ms |  5.60 ms |  5.24 ms |  1.41 |   10 |

VB 基准测试结果

|                                           Method |    Mean |    Error |   StdDev | Ratio | Rank |
|------------------------------------------------- |--------:|---------:|---------:|------:|-----:|
|             Contact_SystemText_StreamingChunk64K | 2.379 s | 0.0103 s | 0.0097 s |  0.51 |    1 |
| Contact_SystemText_StreamingBatch10BufferSize64K | 2.382 s | 0.0079 s | 0.0070 s |  0.51 |    1 |
|       Contact_SystemText_DefaultEnumerableStream | 2.501 s | 0.0065 s | 0.0061 s |  0.54 |    2 |
|                     Contact_SystemText_Streaming | 2.657 s | 0.0060 s | 0.0057 s |  0.57 |    3 |
|              Contact_SystemText_StreamingBatch10 | 2.687 s | 0.0122 s | 0.0114 s |  0.58 |    3 |
|                       Contact_SystemText_Default | 4.120 s | 0.0422 s | 0.0395 s |  0.88 |    4 |
| Contact_NewtonSoft_StreamingBatch10BufferSize64K | 4.509 s | 0.0251 s | 0.0235 s |  0.97 |    5 |
|              Contact_NewtonSoft_StreamingBatch10 | 4.588 s | 0.0321 s | 0.0300 s |  0.99 |    6 |
|             Contact_NewtonSoft_StreamingChunk64K | 4.613 s | 0.0309 s | 0.0289 s |  0.99 |    6 |
|                     Contact_NewtonSoft_Streaming | 4.655 s | 0.0171 s | 0.0160 s |  1.00 |    6 |
|                 Contact_NewtonSoft_DefaultStream | 5.492 s | 0.0571 s | 0.0534 s |  1.18 |    7 |
|                       Contact_NewtonSoft_Default | 6.318 s | 0.0654 s | 0.0612 s |  1.36 |    8 |


|                                        Method |       Mean |    Error |   StdDev | Ratio | Rank |
|---------------------------------------------- |-----------:|---------:|---------:|------:|-----:|
|                 Ebay_SystemText_DefaultStream |   732.3 ms |  6.43 ms |  5.70 ms |  0.67 |    1 |
|                       Ebay_SystemText_Default |   957.1 ms |  4.78 ms |  4.23 ms |  0.88 |    2 |
| Ebay_NewtonSoft_StreamingBatch10BufferSize64K | 1,064.6 ms | 11.54 ms | 10.80 ms |  0.97 |    3 |
|             Ebay_NewtonSoft_StreamingChunk64K | 1,069.2 ms |  6.06 ms |  5.67 ms |  0.98 |    3 |
|                     Ebay_NewtonSoft_Streaming | 1,092.3 ms |  5.87 ms |  5.50 ms |  1.00 |    4 |
|              Ebay_NewtonSoft_StreamingBatch10 | 1,096.1 ms |  3.42 ms |  3.03 ms |  1.00 |    4 |
|                 Ebay_NewtonSoft_DefaultStream | 1,220.9 ms |  9.47 ms |  8.86 ms |  1.12 |    5 |
|             Ebay_SystemText_StreamingChunk64K | 1,489.3 ms |  4.36 ms |  3.86 ms |  1.36 |    6 |
|                       Ebay_NewtonSoft_Default | 1,499.3 ms | 13.32 ms | 12.46 ms |  1.37 |    6 |
| Ebay_SystemText_StreamingBatch10BufferSize64K | 1,514.3 ms |  3.99 ms |  3.33 ms |  1.39 |    6 |
|                     Ebay_SystemText_Streaming | 1,579.1 ms |  4.95 ms |  4.39 ms |  1.45 |    7 |
|              Ebay_SystemText_StreamingBatch10 | 1,598.5 ms |  3.85 ms |  3.60 ms |  1.46 |    8 |

评论

  1. VB 和 C# 之间的性能差异在误差范围内,因此性能基本相同,达到 99%。
  2. Ebay_SystemText_Streaming... 略慢于 Ebay_NewtonSoft_Streaming,因为需要提前读取以找到对象末尾才能反序列化,但仍然比 Newtonsoft 将整个文件加载到 string 中并反序列化要快
  3. 通过库(使用 64KB 缓冲区大小)的简单 JSON 集合对象(Contacts)比使用默认缓冲区大小(16KB)的 DeserializeAsyncEnumerable 更快
  4. 对于复杂 JSON 对象,虽然 System.Text.Json 的自定义 Utf8JsonAsyncStreamReader 的性能可以接受,但它比 System.Text.Json 将整个文件加载或流式传输到内存中要慢。然而,关键在于,自定义 Utf8JsonAsyncStreamReader 的内存占用极小,而 System.Text.Json 的内存需求则非常高。

总结

处理大型数据流并不像看起来那么难。提供了 C# 和 VB 的示例实现,用于通过 文件系统Web API 处理原始 JSON 数据和压缩 JSON 数据。

Newtonsoft.JsonSystem.Text.Json API 的文件系统和 Web API 流相似,但 System.Text.Json API 的性能提升绝对值得迁移到 DotNetCore 6.0+。

虽然 System.Text.Json 缺少用于一次遍历流对象的 Newtonsoft.Json.JsonTextReader,但我们可以实现自己的高性能自定义 Utf8JsonAsyncStreamReader

历史

  • v1.0 - 2022 年 11 月 1 日 - 初次发布 (DotNet 6.0)
  • v1.01 - 2022 年 11 月 17 日 - 更新并添加了 Dot Net 7.0 的下载
  • v1.02 - 2022 年 11 月 18 日 - 将配置属性列表从文本代码块更新为表格
© . All rights reserved.