65.9K
CodeProject 正在变化。 阅读更多。
Home

Cinchoo ETL - 基于深度嵌套数组属性拆分大型 JSON 文件

starIconstarIconstarIconstarIconstarIcon

5.00/5 (2投票s)

2022年1月12日

CPOL

3分钟阅读

viewsIcon

7027

使用 Cinchoo ETL 基于深度嵌套数组属性拆分大型 JSON 文件的技巧

1. 引言

ChoETL 是一个用于 .NET 的开源 ETL(提取、转换和加载)框架。 它是一个基于代码的库,用于从多个源提取数据,在 .NET 环境中转换数据并加载到您自己的数据仓库中。 您可以立即在数据仓库中拥有数据。

本文讨论了如何使用 Cinchoo ETL 框架基于深度嵌套的数组属性拆分大型 JSON 文件。 此方法有助于解析大型 JSON 文件,而无需通过将它们拆分为多个较小的文件来处理内存溢出异常。

2. 要求

这个框架库是用 C# 编写的,使用 .NET 4.5 / .NET Core 3.x 框架。

3. 如何使用

3.1 示例数据

让我们首先查看下面的示例 JSON 文件。 假设 JSON 文件很大,在这里,InstrumentData 节点重复出现,并且包含数千个元素。 本练习的目的是生成拆分的文件,每个文件包含所有根节点以及一个 InstrumentData

清单 3.1.1。 输入 JSON 文件 (sample.json)
{
  "Job": {
    "Keys": {
      "JobID": "test123",
      "DeviceID": "TEST01"
    },
    "Props": {
      "FileType": "Measurements",
      "InstrumentDescriptions": [
        {
          "InstrumentID": "1723007",
          "InstrumentType": "Actual1",
          "Name": "U",
          "DataType": "Double",
          "Units": "degC"
        },
        {
          "InstrumentID": "2424009",
          "InstrumentType": "Actual2",
          "Name": "VG03",
          "DataType": "Double",
          "Units": "Pa"
        }
      ]
    },
    "Steps": [
      {
        "Keys": {
          "StepID": "START",
          "StepResult": "NormalEnd"
        },
        "InstrumentData": [
          {
            "Keys": {
              "InstrumentID": "1723007"
            },
            "Measurements": [
              {
                "DateTime": "2021-11-16 21:18:37.000",
                "Value": 540
              },
              {
                "DateTime": "2021-11-16 21:18:37.100",
                "Value": 539
              },
              {
                "DateTime": "2021-11-16 21:18:37.200",
                "Value": 540
              },
              {
                "DateTime": "2021-11-16 21:18:37.300",
                "Value": 540
              },
            ]
          },
          {
            "Keys": {
              "InstrumentID": "2424009"
            },
            "Measurements": [
              {
                "DateTime": "2021-11-16 21:18:37.000",
                "Value": 1333.22
              },
              {
                "DateTime": "2021-11-16 21:18:37.100",
                "Value": 1333.22
              },
            ]
          }
        ]
      }
    ]
  }
}

步骤 1:在上面,Job.KeysJob.Props 是所有元素的公共因子,需要包含在所有新的拆分文件中。

步骤 2:由于输入文件带有 Job.Steps[*].InstrumentData[*] 节点,因此需要 2 个级别的解析才能按 InstrumentData 级别拆分文件。

步骤 3:按每个 Steps[*] 节点拆分文件(又名 Steps_0.jsonSteps_1.json 等)

步骤 4。 然后,获取每个 StepsFiles 并按每个 InstrumentData[*] 节点级别拆分它们。(又名 InstrumentData_0.jsonInstrumentData_1.json 等)

最终预期的拆分文件应如下所示。

清单 3.1.2。 输出拆分 JSON 文件 (InstrumentData_0.json)
{
  "Job": {
    "Keys": {
      "JobID": "test123",
      "DeviceID": "TEST01"
    },
    "Props": {
      "FileType": "Measurements",
      "InstrumentDescriptions": [
        {
          "InstrumentID": "1723007",
          "InstrumentType": "Actual1",
          "Name": "U",
          "DataType": "Double",
          "Units": "degC"
        },
        {
          "InstrumentID": "2424009",
          "InstrumentType": "Actual2",
          "Name": "VG03",
          "DataType": "Double",
          "Units": "Pa"
        }
      ]
    },
    "Steps": {
      "Keys": {
        "InstrumentID": "1723007"
      },
      "Measurements": [
        {
          "DateTime": "2021-11-16 21:18:37.000",
          "Value": 540
        },
        {
          "DateTime": "2021-11-16 21:18:37.100",
          "Value": 539
        },
        {
          "DateTime": "2021-11-16 21:18:37.200",
          "Value": 540
        },
      ]
    }
  }
}

首先要做的是安装 ChoETL.JSON/ChoETL.JSON.NETStandard nuget 包。 为此,请在程序包管理器控制台中运行以下命令。

.NET Framework

Install-Package ChoETL.JSON

.NET Core

Install-Package ChoETL.JSON.NETStandard

现在将 ChoETL 命名空间添加到程序。

using ChoETL;

3.2 拆分操作

由于 JSON 文件的大小很大,因此我们需要考虑以流模型反序列化 InstrumentData 节点,而不是将整个文件加载到内存中,以避免内存压力。

首先,按如下所示的 Steps 节点拆分文件

  1. 捕获 Job.Keys 节点的值。
  2. 捕获 Job.Props 节点的值。
  3. 然后,循环遍历每个 Job.Steps 节点,使用 ChoJObjectWriter(以流方式写入 json 值的实用程序类)生成输出拆分文件(又名 Steps_0.jsonSteps_1.json 等)
  4. 最后,该方法返回步骤拆分文件名列表,以供按 InstrumentData 节点拆分。
清单 3.2.1。 按步骤节点拆分
static string[] SplitBySteps(string inputFilePath)
{
    List<string> stepsFiles = new List<string>();
    
    dynamic keys = null;
    dynamic props = null;
    
    //Capture Keys
    using (var r = new ChoJSONReader(inputFilePath).WithJSONPath("$..Job.Keys"))
    {
        keys = r.FirstOrDefault();
    }
    
    //Capture props
    using (var r = new ChoJSONReader(inputFilePath).WithJSONPath("$..Job.Props"))
    {
        props = r.FirstOrDefault();
    }
    
    int fileCount = 0;
    //Loop thro Steps, write to individual files
    using (var r = ChoJSONReader.LoadText(json).WithJSONPath("$..Job.Steps")
           .NotifyAfter(1)
           .Setup(s => s.RowsLoaded += (o, e) => 
            $"Step Nodes loaded: {e.RowsLoaded} <- {DateTime.Now}".Print())

           //Callback used to hook up to loader, stream the nodes to file 
           //(this avoids loading to memory)
           .Configure(c => c.CustomJObjectLoader = (sr, s) =>
                      {
                          string outFilePath = $"Steps_{fileCount++}.json";
                          $"Writing to `{outFilePath}` file...".Print();

                          using (var topJo = new ChoJObjectWriter(outFilePath))
                          {
                              topJo.Formatting = Newtonsoft.Json.Formatting.Indented;
                              using (var jo = new ChoJObjectWriter("Job", topJo))
                              {
                                  jo.WriteProperty("Keys", keys);
                                  jo.WriteProperty("Props", props);
                                  jo.WriteProperty("Steps", sr);
                              }
                          }

                          //File.ReadAllText(outFilePath).Print();
                          //"".Print();

                          stepsFiles.Add(outFilePath);
                          
                          return ChoJSONObjects.EmptyJObject;
                      })
          )
    {
        r.Loop();
    }
    
    return stepsFiles.ToArray();
}

下一步是一次使用上面生成的步骤拆分文件之一,然后按 InstrumentData 节点拆分它们。 下面的代码展示了如何操作。

接下来,按如下所示的 InstrumentData 节点拆分文件

  1. 捕获 Job.Keys 节点的值。
  2. 捕获 Job.Props 节点的值。
  3. 捕获 Job.Steps.Keys 节点的值。
  4. 然后,循环遍历每个 Job.Steps.InstrumentData 节点,使用 ChoJObjectWriter 生成输出拆分文件(又名 InstrumentData_0.jsonInstrumentData_1.json 等)。
  5. 最后,该方法返回 InstrumentData 拆分文件名列表。
清单 3.2.2。 按 InstrumentData 节点拆分
static string[] SplitByInstrumentData(string stepsFilePath)
{
    List<string> instrumentDataFiles = new List<string>();
    
    dynamic keys = null;
    dynamic props = null;
    dynamic stepsKeys = null;
    
    //Capture Keys
    using (var r = new ChoJSONReader(stepsFilePath).WithJSONPath("$..Job.Keys"))
    {
        keys = r.FirstOrDefault();
    }
    
    //Capture props
    using (var r = new ChoJSONReader(stepsFilePath).WithJSONPath("$..Job.Props"))
    {
        props = r.FirstOrDefault();
    }
            
    //Capture steps/keys
    using (var r = new ChoJSONReader(stepsFilePath).WithJSONPath("$..Job.Steps.Keys"))
    {
        stepsKeys = r.FirstOrDefault();
    }

    int fileCount = 0;
    //Loop thro InstrumentData, write to individual files
    using (var r = ChoJSONReader.LoadText(json).WithJSONPath("$..Job.Steps.InstrumentData")
           .NotifyAfter(1)
           .Setup(s => s.RowsLoaded += (o, e) => $"InstrumentData Nodes loaded: 
                                       {e.RowsLoaded} <- {DateTime.Now}".Print())

           //Callback used to hook up to loader, stream the nodes to file 
           //(this avoids loading to memory)
           .Configure(c => c.CustomJObjectLoader = (sr, s) =>
                      {
                          string outFilePath = $"InstrumentData_{fileCount++}.json";
                          $"Writing to `{outFilePath}` file...".Print();

                          using (var topJo = new ChoJObjectWriter(outFilePath))
                          {
                              topJo.Formatting = Newtonsoft.Json.Formatting.Indented;
                              using (var jo = new ChoJObjectWriter("Job", topJo))
                              {
                                  jo.WriteProperty("Keys", keys);
                                  jo.WriteProperty("Props", props);
                                  jo.WriteProperty("Steps", sr);
                              }
                          }

                          File.ReadAllText(outFilePath).Print();
                          "".Print();

                          instrumentDataFiles.Add(outFilePath);
                          
                          return ChoJSONObjects.EmptyJObject;
                      })
          )
    {
        r.Loop();
    }
    
    return instrumentDataFiles.ToArray();
}

最后,使用以上两种方法来完成按 InstrumentData 的拆分过程,如下所示。

清单 3.2.3。 Main() 方法
public static void Main()
{
    var inputFilePath = "input.json";
    
    foreach (var stepsFilePath in SplitBySteps(inputFilePath))
    {
        SplitByInstrumentData(stepsFilePath);
        File.Delete(stepsFilePath);
    }
}

示例 fiddle: https://dotnetfiddle.net/j3Y03m

有关 Cinchoo ETL 的更多信息,请访问其他 CodeProject 文章。

历史

  • 2022 年 1 月 12 日:初始版本
© . All rights reserved.