Cinchoo ETL - 基于深度嵌套数组属性拆分大型 JSON 文件
使用 Cinchoo ETL 基于深度嵌套数组属性拆分大型 JSON 文件的技巧
- 下载 Cinchoo ETL 源代码
- 下载 Cinchoo ETL 二进制文件 (.NET Core)
- 下载 Cinchoo ETL 二进制文件 (.NET Framework)
- 工作示例 1 (.NET Fiddle)
1. 引言
ChoETL 是一个用于 .NET 的开源 ETL(提取、转换和加载)框架。 它是一个基于代码的库,用于从多个源提取数据,在 .NET 环境中转换数据并加载到您自己的数据仓库中。 您可以立即在数据仓库中拥有数据。
本文讨论了如何使用 Cinchoo ETL 框架基于深度嵌套的数组属性拆分大型 JSON 文件。 此方法有助于解析大型 JSON 文件,而无需通过将它们拆分为多个较小的文件来处理内存溢出异常。
2. 要求
这个框架库是用 C# 编写的,使用 .NET 4.5 / .NET Core 3.x 框架。
3. 如何使用
3.1 示例数据
让我们首先查看下面的示例 JSON 文件。 假设 JSON 文件很大,在这里,InstrumentData
节点重复出现,并且包含数千个元素。 本练习的目的是生成拆分的文件,每个文件包含所有根节点以及一个 InstrumentData
。
清单 3.1.1。 输入 JSON 文件 (sample.json)
{
"Job": {
"Keys": {
"JobID": "test123",
"DeviceID": "TEST01"
},
"Props": {
"FileType": "Measurements",
"InstrumentDescriptions": [
{
"InstrumentID": "1723007",
"InstrumentType": "Actual1",
"Name": "U",
"DataType": "Double",
"Units": "degC"
},
{
"InstrumentID": "2424009",
"InstrumentType": "Actual2",
"Name": "VG03",
"DataType": "Double",
"Units": "Pa"
}
]
},
"Steps": [
{
"Keys": {
"StepID": "START",
"StepResult": "NormalEnd"
},
"InstrumentData": [
{
"Keys": {
"InstrumentID": "1723007"
},
"Measurements": [
{
"DateTime": "2021-11-16 21:18:37.000",
"Value": 540
},
{
"DateTime": "2021-11-16 21:18:37.100",
"Value": 539
},
{
"DateTime": "2021-11-16 21:18:37.200",
"Value": 540
},
{
"DateTime": "2021-11-16 21:18:37.300",
"Value": 540
},
]
},
{
"Keys": {
"InstrumentID": "2424009"
},
"Measurements": [
{
"DateTime": "2021-11-16 21:18:37.000",
"Value": 1333.22
},
{
"DateTime": "2021-11-16 21:18:37.100",
"Value": 1333.22
},
]
}
]
}
]
}
}
步骤 1:在上面,Job.Keys
和 Job.Props
是所有元素的公共因子,需要包含在所有新的拆分文件中。
步骤 2:由于输入文件带有 Job.Steps[*].InstrumentData[*]
节点,因此需要 2 个级别的解析才能按 InstrumentData
级别拆分文件。
步骤 3:按每个 Steps[*]
节点拆分文件(又名 Steps_0.json、Steps_1.json 等)
步骤 4。 然后,获取每个 StepsFiles 并按每个 InstrumentData[*]
节点级别拆分它们。(又名 InstrumentData_0.json、InstrumentData_1.json 等)
最终预期的拆分文件应如下所示。
清单 3.1.2。 输出拆分 JSON 文件 (InstrumentData_0.json)
{
"Job": {
"Keys": {
"JobID": "test123",
"DeviceID": "TEST01"
},
"Props": {
"FileType": "Measurements",
"InstrumentDescriptions": [
{
"InstrumentID": "1723007",
"InstrumentType": "Actual1",
"Name": "U",
"DataType": "Double",
"Units": "degC"
},
{
"InstrumentID": "2424009",
"InstrumentType": "Actual2",
"Name": "VG03",
"DataType": "Double",
"Units": "Pa"
}
]
},
"Steps": {
"Keys": {
"InstrumentID": "1723007"
},
"Measurements": [
{
"DateTime": "2021-11-16 21:18:37.000",
"Value": 540
},
{
"DateTime": "2021-11-16 21:18:37.100",
"Value": 539
},
{
"DateTime": "2021-11-16 21:18:37.200",
"Value": 540
},
]
}
}
}
首先要做的是安装 ChoETL.JSON/ChoETL.JSON.NETStandard
nuget 包。 为此,请在程序包管理器控制台中运行以下命令。
.NET Framework
Install-Package ChoETL.JSON
.NET Core
Install-Package ChoETL.JSON.NETStandard
现在将 ChoETL
命名空间添加到程序。
using ChoETL;
3.2 拆分操作
由于 JSON 文件的大小很大,因此我们需要考虑以流模型反序列化 InstrumentData
节点,而不是将整个文件加载到内存中,以避免内存压力。
首先,按如下所示的 Steps
节点拆分文件
- 捕获
Job.Keys
节点的值。 - 捕获
Job.Props
节点的值。 - 然后,循环遍历每个
Job.Steps
节点,使用ChoJObjectWriter
(以流方式写入 json 值的实用程序类)生成输出拆分文件(又名 Steps_0.json、Steps_1.json 等) - 最后,该方法返回步骤拆分文件名列表,以供按
InstrumentData
节点拆分。
清单 3.2.1。 按步骤节点拆分
static string[] SplitBySteps(string inputFilePath)
{
List<string> stepsFiles = new List<string>();
dynamic keys = null;
dynamic props = null;
//Capture Keys
using (var r = new ChoJSONReader(inputFilePath).WithJSONPath("$..Job.Keys"))
{
keys = r.FirstOrDefault();
}
//Capture props
using (var r = new ChoJSONReader(inputFilePath).WithJSONPath("$..Job.Props"))
{
props = r.FirstOrDefault();
}
int fileCount = 0;
//Loop thro Steps, write to individual files
using (var r = ChoJSONReader.LoadText(json).WithJSONPath("$..Job.Steps")
.NotifyAfter(1)
.Setup(s => s.RowsLoaded += (o, e) =>
$"Step Nodes loaded: {e.RowsLoaded} <- {DateTime.Now}".Print())
//Callback used to hook up to loader, stream the nodes to file
//(this avoids loading to memory)
.Configure(c => c.CustomJObjectLoader = (sr, s) =>
{
string outFilePath = $"Steps_{fileCount++}.json";
$"Writing to `{outFilePath}` file...".Print();
using (var topJo = new ChoJObjectWriter(outFilePath))
{
topJo.Formatting = Newtonsoft.Json.Formatting.Indented;
using (var jo = new ChoJObjectWriter("Job", topJo))
{
jo.WriteProperty("Keys", keys);
jo.WriteProperty("Props", props);
jo.WriteProperty("Steps", sr);
}
}
//File.ReadAllText(outFilePath).Print();
//"".Print();
stepsFiles.Add(outFilePath);
return ChoJSONObjects.EmptyJObject;
})
)
{
r.Loop();
}
return stepsFiles.ToArray();
}
下一步是一次使用上面生成的步骤拆分文件之一,然后按 InstrumentData
节点拆分它们。 下面的代码展示了如何操作。
接下来,按如下所示的 InstrumentData
节点拆分文件
- 捕获
Job.Keys
节点的值。 - 捕获
Job.Props
节点的值。 - 捕获
Job.Steps.Keys
节点的值。 - 然后,循环遍历每个
Job.Steps.InstrumentData
节点,使用ChoJObjectWriter
生成输出拆分文件(又名 InstrumentData_0.json、InstrumentData_1.json 等)。 - 最后,该方法返回
InstrumentData
拆分文件名列表。
清单 3.2.2。 按 InstrumentData 节点拆分
static string[] SplitByInstrumentData(string stepsFilePath)
{
List<string> instrumentDataFiles = new List<string>();
dynamic keys = null;
dynamic props = null;
dynamic stepsKeys = null;
//Capture Keys
using (var r = new ChoJSONReader(stepsFilePath).WithJSONPath("$..Job.Keys"))
{
keys = r.FirstOrDefault();
}
//Capture props
using (var r = new ChoJSONReader(stepsFilePath).WithJSONPath("$..Job.Props"))
{
props = r.FirstOrDefault();
}
//Capture steps/keys
using (var r = new ChoJSONReader(stepsFilePath).WithJSONPath("$..Job.Steps.Keys"))
{
stepsKeys = r.FirstOrDefault();
}
int fileCount = 0;
//Loop thro InstrumentData, write to individual files
using (var r = ChoJSONReader.LoadText(json).WithJSONPath("$..Job.Steps.InstrumentData")
.NotifyAfter(1)
.Setup(s => s.RowsLoaded += (o, e) => $"InstrumentData Nodes loaded:
{e.RowsLoaded} <- {DateTime.Now}".Print())
//Callback used to hook up to loader, stream the nodes to file
//(this avoids loading to memory)
.Configure(c => c.CustomJObjectLoader = (sr, s) =>
{
string outFilePath = $"InstrumentData_{fileCount++}.json";
$"Writing to `{outFilePath}` file...".Print();
using (var topJo = new ChoJObjectWriter(outFilePath))
{
topJo.Formatting = Newtonsoft.Json.Formatting.Indented;
using (var jo = new ChoJObjectWriter("Job", topJo))
{
jo.WriteProperty("Keys", keys);
jo.WriteProperty("Props", props);
jo.WriteProperty("Steps", sr);
}
}
File.ReadAllText(outFilePath).Print();
"".Print();
instrumentDataFiles.Add(outFilePath);
return ChoJSONObjects.EmptyJObject;
})
)
{
r.Loop();
}
return instrumentDataFiles.ToArray();
}
最后,使用以上两种方法来完成按 InstrumentData
的拆分过程,如下所示。
清单 3.2.3。 Main() 方法
public static void Main()
{
var inputFilePath = "input.json";
foreach (var stepsFilePath in SplitBySteps(inputFilePath))
{
SplitByInstrumentData(stepsFilePath);
File.Delete(stepsFilePath);
}
}
示例 fiddle: https://dotnetfiddle.net/j3Y03m
有关 Cinchoo ETL 的更多信息,请访问其他 CodeProject 文章。
历史
- 2022 年 1 月 12 日:初始版本