65.9K
CodeProject 正在变化。 阅读更多。
Home

Cinchoo ETL - Parquet 读取器

starIconstarIconstarIconstarIconstarIcon

5.00/5 (1投票)

2020 年 6 月 4 日

CPOL

32分钟阅读

viewsIcon

17200

.NET 的简单 Parquet 读取器

目录

1. 引言

ChoETL 是一个开源的 .NET ETL(提取、转换和加载)框架。它是一个基于代码的库,用于从多种来源提取数据,进行转换,然后加载到 .NET 环境中的您自己的数据仓库中。您可以快速地在数据仓库中获得数据。

Apache Parquet,一个开源的 Hadoop **文件**格式。Parquet 以扁平的列式格式存储嵌套的数据结构。与数据以行式存储的传统方法相比,parquet 在存储和性能方面效率更高。

本文讨论了使用 ChoETL 框架提供的 ChoParquetReader 组件。它是一个简单的实用类,用于将文件/源中的 Parquet 数据提取到对象中。

特点

  • 在底层使用 Parquet.NET 解析器,在几秒钟内解析 Parquet 文件,并且能够处理大文件而没有内存问题。
  • 基于流的解析器可提供极致的性能、低资源使用率以及几乎无限的通用性,可扩展到任何大小的数据文件,甚至数十或数百 GB
  • 基于事件的数据操作和验证允许在批量插入过程中完全控制数据流
  • 公开 IEnumerable 对象列表——通常与 LINQ 查询结合使用,用于投影、聚合和过滤等。
  • 支持延迟读取
  • 支持处理具有特定日期、货币和数字格式的文化文件
  • 读取文件时识别各种日期、货币、枚举、布尔值和数字格式
  • 在写入文件时提供对日期、货币、枚举、布尔值、数字格式的精细控制
  • 详细而强大的错误处理,让您可以快速查找和修复问题。

2. 要求

此框架库使用 C# 编写,基于 .NET 4.5 Framework / .NET core 2.x。

3. “Hello World!” 示例

  • 打开 VS.NET 2013 或更高版本
  • 创建一个示例 VS.NET(.NET Framework 4.5 / .NET core 2.x)控制台应用程序项目。
  • 根据 .NET 环境,通过 包管理器控制台 使用 Nuget 命令安装 **ChoETL**。
    • Install-Package ChoETL.Parquet
  • 使用 ChoETL 命名空间

让我们从查看一个包含 2 个字段的简单 Parquet 文件读取示例开始。

图 3.1 示例 Parquet 数据文件 (emp.parquet)

有许多方法可以以最少的设置开始 Parquet 文件解析。

3.1. 快速加载 - 数据优先方法

这是 **零配置**、快速加载 Parquet 文件的方式。不需要 POCO 对象。下面的示例代码展示了如何加载文件。

列表 3.1.1 使用迭代器加载 Parquet 文件

foreach (dynamic rec in new ChoParquetReader("emp.parquet"))
{
    Console.WriteLine($"Id: {rec.Id}, Name: {rec.Name}");
}

示例 Fiddle:https://dotnetfiddle.net/4dJk4G

列表 3.1.2 使用循环加载 Parquet 文件

var reader = new ChoParquetReader("emp.parquet");
dynamic rec;
 
while ((rec = reader.Read()) != null)
{
    Console.WriteLine($"Id: {rec.Id}, Name: {rec.Name}");
}

示例 Fiddle:https://dotnetfiddle.net/XAtppL

3.2. 代码优先方法

这是另一种 **零配置** 的方法,用于解析和加载 Parquet 文件,并使用 POCO 类。首先,定义一个简单的数据类来匹配底层的 Parquet 文件布局。

清单 3.2.1 简单的 POCO 实体类

public partial class EmployeeRec
{
    public int Id { get; set; }
    public string Name { get; set; } 
}

在上面,该类定义了两个属性,与示例 Parquet 文件模板匹配。

列表 3.2.2 加载 Parquet 文件

foreach (var rec in new ChoParquetReader<EmployeeRec>("emp.parquet"))
{
    Console.WriteLine($"Id: {rec.Id}, Name: {rec.Name}");
}

示例 Fiddle:https://dotnetfiddle.net/00baoy

3.3. 配置优先方法

在此模型中,我们定义了 Parquet 配置,包含所有必需的解析参数以及与底层 Parquet 文件匹配的 Parquet 字段。

列表 3.3.1 定义 Parquet 配置

ChoParquetRecordConfiguration config = new ChoParquetRecordConfiguration();
config.ParquetRecordFieldConfigurations.Add(new ChoParquetRecordFieldConfiguration("Id"));
config.ParquetRecordFieldConfigurations.Add
       (new ChoParquetRecordFieldConfiguration("Name"));

在上面,该类定义了两个属性,与示例 Parquet 文件模板匹配。

列表 3.3.2 不带 POCO 对象加载 Parquet 文件

foreach (dynamic rec in new ChoParquetReader("emp.parquet", config))
{
    Console.WriteLine($"Id: {rec.Id}, Name: {rec.Name}");
}

示例 Fiddle:https://dotnetfiddle.net/V5ts04

列表 3.3.3 带 POCO 对象加载 Parquet 文件

foreach (var rec in new ChoParquetReader<EmployeeRec>("emp.parquet", config))
{
    Console.WriteLine($"Id: {rec.Id}, Name: {rec.Name}");
}

示例 Fiddle:https://dotnetfiddle.net/mwd0EK

3.4. 代码优先声明式配置

这是结合使用 POCO 实体类和声明式装饰的 Parquet 配置参数的方法。Id 是必需字段,Name 是可选值字段,默认值为 “XXXX”。如果 Name 不存在,则取默认值。

清单 3.4.1 定义 POCO 对象

public class EmployeeRec
{
    [ChoParquetRecordField]
    [Required]
    public int Id
    {
        get;
        set;
    }
    [ChoParquetRecordField]
    [DefaultValue("XXXX")]
    public string Name
    {
        get;
        set;
    }
 
    public override string ToString()
    {
        return "{0}. {1}".FormatString(Id, Name);
    }
}

上面的代码说明了如何定义 POCO 对象来承载输入文件中每条记录行(record line)的值。首先,为每个记录字段定义属性,并使用 ChoParquetRecordFieldAttribute 进行限定,以便与 Parquet 记录映射。ParquetPath是一个可选属性。如果未指定,框架会自动发现并从 Parquet 属性加载值。Id 被装饰了 RequiredAttribute,如果值缺失,它将抛出异常。Name 使用 DefaultValueAttribute 赋予了默认值。这意味着,如果 Name Parquet 字段在文件中包含空值,它将被默认设置为“XXXX”值。

它非常简单,可以立即提取 Parquet 数据。

清单 3.4.2 主方法

foreach (var rec in new ChoParquetReader<EmployeeRec>("emp.parquet"))
{
    Console.WriteLine($"Id: {rec.Id}, Name: {rec.Name}");
}

我们首先创建一个 ChoParquetReader 对象的实例。就是这样。所有解析和将 Parquet 数据流加载到对象中的繁重工作都由底层的解析器完成。

默认情况下,ChoParquetReader 在加载 Parquet 文件时会发现并使用默认配置参数。这些可以根据您的需求进行覆盖。以下部分将详细介绍每个配置属性。

4. 读取所有记录

它就像设置与 Parquet 文件结构匹配的 POCO 对象一样简单,您可以将整个文件作为可枚举模式读取。这是延迟执行模式,但在对其进行任何聚合操作时要小心。这将把整个文件的记录加载到内存中。

列表 4.1 读取 Parquet 文件

foreach (var rec in new ChoParquetReader<EmployeeRec>("emp.parquet"))
{
    Console.WriteLine($"Id: {rec.Id}, Name: {rec.Name}");
}

列表 4.2 读取 Parquet 文件流

foreach (var rec in new ChoParquetReader<EmployeeRec>(textReader))
{
    Console.WriteLine($"Id: {rec.Id}, Name: {rec.Name}");
}

此模型使您的代码优雅、简洁、易于阅读和维护。还可以利用 LINQ 扩展方法执行分组、连接、投影、聚合等操作。

清单 4.3 使用 LINQ

var list = (from o in new ChoParquetReader<EmployeeRec>("emp.parquet")
           where o.Name != null && o.Name.StartsWith("R")
           select o).ToArray();
 
foreach (var rec in list)
{
    Console.WriteLine($"Id: {rec.Id}, Name: {rec.Name}");
}

5. 手动读取记录

它就像设置与 Parquet 文件结构匹配的 POCO 对象一样简单,您可以将整个文件作为可枚举模式读取。

列表 5.1 读取 Parquet 文件

var reader = new ChoParquetReader<EmployeeRec>("emp.parquet");
var rec = (object)null;
 
while ((rec = reader.Read()) != null)
{
    Console.WriteLine($"Id: {rec.Id}, Name: {rec.Name}");
}

6. 自定义 Parquet 记录

使用 ChoParquetRecordObjectAttribute,您可以声明式地自定义 POCO 实体对象。

清单 6.1 为每个记录自定义 POCO 对象

[ChoParquetRecordObject]
public class EmployeeRec
{
    [ChoParquetRecordField] 
    public int Id { get; set; }
    [ChoParquetRecordField] 
    [Required]
    [DefaultValue("ZZZ")]
    public string Name { get; set; }
}

以下是用于自定义文件上的 Parquet 加载操作的可用属性。

  • CultureName - 用于读取和写入 Parquet 数据的文化名称(例如,en-USen-GB)。
  • Encoding - Parquet 文件的编码。
  • ColumnCountStrict - 此标志指示在读取预期字段丢失时是否应抛出异常。
  • ErrorMode - 此标志指示在读取时是否应抛出异常,并且预期的字段加载失败。这可以每隔属性覆盖。可能的值是
    • IgnoreAndContinue - 忽略错误,跳过记录并继续处理下一条。
    • ReportAndContinue - 如果 POCO 实体是 IChoNotifyRecordRead 类型,则向其报告错误
    • ThrowAndStop - 抛出错误并停止执行
  • IgnoreFieldValueMode - 一个标志,告知读取器在读取时是否应跳过空/null 的记录。这可以每隔属性覆盖。可能的值是
    • Null - 如果记录值为 null,则跳过
    • DBNull - N/A
    • Empty - 如果记录值为空,则跳过
    • WhiteSpace - 如果记录值仅包含空格,则跳过
  • ObjectValidationMode - 一个标志,告知读取器关于记录对象要执行的验证类型。可能的值是
    • Off - 不执行对象验证(默认)
    • MemberLevel - 在加载每个 Parquet 属性的值时执行的验证。
    • ObjectLevel - 在所有属性加载到 POCO 对象后执行验证

7. 自定义 Parquet 字段

对于每个 Parquet 字段,您可以使用 ChoParquetRecordFieldAttribute 在 POCO 实体属性中指定映射。只有当您想使用自定义 ParquetPath 来映射到该字段时,才使用此属性。

列表 7.1 为 Parquet 字段自定义 POCO 对象

public class EmployeeRec
{
    [ChoParquetRecordField]
    public int Id { get; set; }
    [ChoParquetRecordField]
    [Required]
    [DefaultValue("ZZZ")]
    public string Name { get; set; }
}

以下是您可以为每个属性添加自定义的可用成员

  • FieldName - 按名称映射时,您指定要用于该属性的 Parquet 字段的名称。

7.1. 默认值

当 Parquet 值为空或为空白时(通过 IgnoreFieldValueMode 控制),用于并设置到属性的值。

可以使用 System.ComponentModel.DefaultValueAttribute 为任何 POCO 实体属性指定默认值。

7.2. 后备值

Parquet 值未能设置时,用于并设置到属性的值。Fallback 值仅在 ErrorModeIgnoreAndContinueReportAndContinue 时设置。

可以使用 ChoETL.ChoFallbackValueAttribute 为任何 POCO 实体属性指定回退值。

7.3. 类型转换器

大多数基本类型会自动转换并设置到属性。如果 Parquet 字段的值无法自动转换为属性的类型,您可以指定自定义/内置 .NET 转换器来转换该值。这些可以是 IValueConverterTypeConverter 转换器。

有几种方法可以为每个字段指定转换器

  • 声明式方法
  • 配置方法

7.3.1. 声明式方法

此模型仅适用于 POCO 实体对象。如果您有 POCO 类,可以为每个属性指定转换器以执行必要的转换。下面的示例展示了如何操作。

列表 7.3.1.1 指定类型转换器

public class EmployeeRec
{
    [ChoParquetRecordField]
    [ChoTypeConverter(typeof(IntConverter))]
    public int Id { get; set; }
    [ChoParquetRecordField]
    [Required]
    [DefaultValue("ZZZ")]
    public string Name { get; set; }
}

列表 7.3.1.2 IntConverter 实现

public class IntConverter : IValueConverter
{
    public object Convert(object value, Type targetType, 
                          object parameter, CultureInfo culture)
    {
        return value;
    }
 
    public object ConvertBack(object value, Type targetType, 
                              object parameter, CultureInfo culture)
    {
        return value;
    }
}

在上面的示例中,我们定义了自定义 IntConverter 类。并展示了如何使用它与“IdParquet 属性。

7.3.2. 配置式方法

此模型适用于动态和 POCO 实体对象。这使得可以在运行时将转换器附加到每个属性。这优先于 POCO 类上的声明式转换器。

清单 7.3.2.2 指定类型转换器

ChoParquetRecordConfiguration config = new ChoParquetRecordConfiguration();

ChoParquetRecordFieldConfiguration idConfig = 
                new ChoParquetRecordFieldConfiguration("Id");
idConfig.AddConverter(new IntConverter());
config.ParquetRecordFieldConfigurations.Add(idConfig);

config.ParquetRecordFieldConfigurations.Add
              (new ChoParquetRecordFieldConfiguration("Name"));

在上面,我们使用 ChoParquetRecordFieldConfiguration 对象中的 AddConverter 帮助方法构造并附加 IntConverter 到“Id”字段。

同样,如果您想从 ChoParquetRecordFieldConfiguration 对象中删除任何转换器,可以使用 RemoveConverter

7.4. 验证

ChoParquetReader 利用 System.ComponentModel.DataAnnotationsValidation Block 验证属性来为 POCO 实体的各个字段指定验证规则。有关可用 DataAnnotation 验证属性的列表,请参阅 MSDN 站点。

列表 7.4.1 在 POCO 实体中使用验证属性

[ChoParquetRecordObject]
public partial class EmployeeRec
{
    [ChoParquetRecordField(FieldName = "id")]
    [ChoTypeConverter(typeof(IntConverter))]
    [Range(1, int.MaxValue, ErrorMessage = "Id must be > 0.")]
    [ChoFallbackValue(1)]
    public int Id { get; set; }
 
    [ChoParquetRecordField(FieldName = "Name")]
    [Required]
    [DefaultValue("ZZZ")]
    [ChoFallbackValue("XXX")]
    public string Name { get; set; }
}

在上面的示例中,我们为 Id 属性使用了 Range 验证属性。为 Name 属性使用了 Required 验证属性。当 Configuration.ObjectValidationMode 设置为 ChoObjectValidationMode.MemberLevelChoObjectValidationMode.ObjectLevel 时,ChoParquetReader 会在加载时对其进行验证。

有时,您可能希望覆盖 POCO 类自带的已定义声明式验证行为,可以通过配置方法使用 Cinchoo ETL 实现。下面的示例展示了如何覆盖它们。

static void ValidationOverridePOCOTest()
{
    ChoParquetRecordConfiguration config = new ChoParquetRecordConfiguration();
    var idConfig = new ChoParquetRecordFieldConfiguration("Id");
    idConfig.Validators = new ValidationAttribute[] { new RequiredAttribute() };
    config.ParquetRecordFieldConfigurations.Add(idConfig);
    config.ParquetRecordFieldConfigurations.Add
           (new ChoParquetRecordFieldConfiguration("Name"));
 
    using (var parser = new ChoParquetReader<EmployeeRec>("emp.parquet", config))
    {
        object rec;
        while ((rec = parser.Read()) != null)
        {
            Console.WriteLine(rec.ToStringEx());
        }
    }
}

在某些情况下,您可能希望自己控制并在 POCO 实体类中执行手动 **自验证**。这可以通过继承 POCO 对象实现 IChoValidatable 接口来完成。

列表 7.4.2 对 POCO 实体进行手动验证

[ChoParquetRecordObject]
public partial class EmployeeRec : IChoValidatable
{
    [ChoParquetRecordField(FieldName = "id")]
    [ChoTypeConverter(typeof(IntConverter))]
    [Range(1, int.MaxValue, ErrorMessage = "Id must be > 0.")]
    [ChoFallbackValue(1)]
    public int Id { get; set; }
 
    [ChoParquetRecordField(FieldName = "Name")]
    [Required]
    [DefaultValue("ZZZ")]
    [ChoFallbackValue("XXX")]
    public string Name { get; set; }
 
    public bool TryValidate
           (object target, ICollection<ValidationResult> validationResults)
    {
        return true;
    }
 
    public bool TryValidateFor
    (object target, string memberName, ICollection<ValidationResult> validationResults)
    {
        return true;
    }
 
    public void Validate(object target)
    {
    }
 
    public void ValidateFor(object target, string memberName)
    {
    }
}

上面的示例展示了如何在 POCO 对象中实现自定义自验证。

IChoValidatable 接口公开以下方法

  • TryValidate - 验证整个对象,如果所有验证都通过,则返回 true。否则,返回 false
  • Validate - 验证整个对象,如果验证未通过则抛出异常。
  • TryValidateFor - 验证对象的特定属性,如果所有验证都通过则返回 true。否则返回 false
  • ValidateFor - 验证对象的特定属性,如果验证未通过则抛出异常。

10. 回调机制

ChoParquetReader 提供行业标准的 Parquet 解析功能,可满足大多数解析需求。* 如果解析不能满足任何需求,您可以使用 ChoParquetReader 提供的回调机制来处理这种情况。为了参与回调机制,您可以使用以下任一模型:

  • 使用 ChoParquetReader 通过 IChoReader 接口公开的事件处理程序。
  • 继承 POCO 实体对象实现 IChoNotifyRecordRead / IChoNotifyFileRead / IChoNotifyRecordFieldRead 接口。
  • 继承 DataAnnotationMetadataType 类型对象,并实现 IChoNotifyRecordRead / IChoNotifyFileRead / IChoNotifyRecordFieldRead 接口。
  • 继承 IChoNotifyRecordFieldConfigurable / IChoNotifyRecordFieldConfigurable 配置接口

注意:从这些接口方法中引发的任何异常都将被忽略。

IChoReader 公开了以下事件

  • BeginLoad - 在开始加载 Parquet 文件时调用。
  • EndLoad - 在结束加载 Parquet 文件时调用。
  • BeforeRecordLoad - 在加载 Parquet 记录之前引发。
  • AfterRecordLoad - 在加载 Parquet 记录之后引发。
  • RecordLoadError - 在加载 Parquet 记录时出错时引发。
  • BeforeRecordFieldLoad - 在加载 Parquet 字段值之前引发。
  • AfterRecordFieldLoad - 在加载 Parquet 字段值之后引发。
  • RecordFieldLoadError - 在加载 Parquet 字段值时出错时引发。
  • SkipUntil——在 Parquet 解析开始前调用,用于添加自定义逻辑以跳过记录行。
  • DoWhile——在 Parquet 解析期间调用,您可以在其中添加自定义逻辑以停止解析。

IChoNotifyRecordRead 公开了以下方法

  • BeforeRecordLoad - 在加载 Parquet 记录之前引发。
  • AfterRecordLoad - 在加载 Parquet 记录之后引发。
  • RecordLoadError - 在加载 Parquet 记录时出错时引发。

IChoNotifyFileRead 公开了以下方法

  • BeginLoad - 在开始加载 Parquet 文件时调用。
  • EndLoad - 在结束加载 Parquet 文件时调用。
  • SkipUntil——在 Parquet 解析开始前调用,用于添加自定义逻辑以跳过记录行。
  • DoWhile——在 Parquet 解析期间调用,您可以在其中添加自定义逻辑以停止解析。

IChoNotifyRecordFieldRead 公开以下方法

  • BeforeRecordFieldLoad - 在加载 Parquet 字段值之前引发。
  • AfterRecordFieldLoad - 在加载 Parquet 字段值之后引发。
  • RecordFieldLoadError - 在加载 Parquet 字段值时出错时引发。

IChoNotifyRecordConfigurable 公开了以下方法:

  • RecondConfigure - 在 Parquet 记录配置时引发。

IChoNotifyRecordFieldConfigurable 公开了以下方法:

  • RecondFieldConfigure - 在每个 Parquet 记录字段配置时引发。

10.1. 使用 ChoParquetReader 事件

这是订阅回调事件并处理解析 Parquet 文件中异常情况的更直接、最简单的方法。缺点是代码不能像通过实现 IChoNotifyRecordRead 与 POCO 记录对象那样进行重用。

下面的示例展示了如何使用 BeforeRecordLoad 回调方法来跳过以 '%' 字符开头的行。

列表 10.1.1 使用 ChoParquetReader 回调事件

static void IgnoreLineTest()
{
    using (var parser = new ChoParquetReader("emp.parquet"))
    {

        parser.BeforeRecordLoad += (o, e) =>
        {
            if (e.Source != null)
            {
                e.Skip = !((IDictionary<string, object>)e.Source).ContainsKey("Name");
            }
        };
        foreach (var e in parser)
            Console.WriteLine(e.Dump());
    }
}

同样,您也可以在 ChoParquetReader 中使用其他回调方法。

10.2. 实现 IChoNotifyRecordRead 接口

下面的示例展示了如何为直接 POCO 类实现 IChoNotifyRecordRead 接口。

清单 10.2.1 直接 POCO 回调机制实现

[ChoParquetRecordObject]
public partial class EmployeeRec : IChoNotifyRecordRead
{
    [ChoParquetRecordField(FieldName = "Id")]
    [ChoTypeConverter(typeof(IntConverter))]
    [Range(1, int.MaxValue, ErrorMessage = "Id must be > 0.")]
    [ChoFallbackValue(1)]
    public int Id { get; set; }
    
    [ChoParquetRecordField(FieldName = "Name")]
    [Required]
    [DefaultValue("ZZZ")]
    [ChoFallbackValue("XXX")]
    public string Name { get; set; }
 
    public bool AfterRecordLoad(object target, int index, object source)
    {
        throw new NotImplementedException();
    }
 
    public bool BeforeRecordLoad(object target, int index, ref object source)
    {
        throw new NotImplementedException();
    }
 
    public bool RecordLoadError(object target, int index, object source, Exception ex)
    {
        throw new NotImplementedException();
    }
}

下面的示例展示了如何通过在 POCO 类上使用 MetadataTypeAttribute 来附加 Metadata 类。

清单 10.2 基于 MetaDataType 的回调机制实现

[ChoParquetRecordObject]
public class EmployeeRecMeta : IChoNotifyRecordRead
{
    [ChoParquetRecordField(FieldName = "Id")]
    [ChoTypeConverter(typeof(IntConverter))]
    [Range(1, int.MaxValue, ErrorMessage = "Id must be > 0.")]
    [ChoFallbackValue(1)]
    public int Id { get; set; }

    [ChoParquetRecordField(FieldName = "Name")]
    [Required]
    [DefaultValue("ZZZ")]
    [ChoFallbackValue("XXX")]
    public string Name { get; set; }
 
    public bool AfterRecordLoad(object target, int index, object source)
    {
        throw new NotImplementedException();
    }
 
    public bool BeforeRecordLoad(object target, int index, ref object source)
    {
        throw new NotImplementedException();
    }
 
    public bool RecordLoadError(object target, int index, object source, Exception ex)
    {
        throw new NotImplementedException();
    }
} 

[MetadataType(typeof(EmployeeRecMeta))]
public partial class EmployeeRec
{
    public int Id { get; set; }
    public string Name { get; set; }
}

下面的示例展示了如何通过在 POCO 类上使用 ChoMetaDataRefTypeAttribute 来为密封类或第三方 POCO 类附加 Metadata 类。

列表 10.2.3 基于 ChoMetaDataRefType 的回调机制实现

[ChoMetadataRefType(typeof(EmployeeRec))]
[ChoParquetRecordObject]
public class EmployeeRecMeta : IChoNotifyRecordRead
{
    [ChoParquetRecordField(FieldName = "id")]
    [ChoTypeConverter(typeof(IntConverter))]
    [Range(1, int.MaxValue, ErrorMessage = "Id must be > 0.")]
    [ChoFallbackValue(1)]
    public int Id { get; set; }

    [ChoParquetRecordField(FieldName = "Name")]
    [Required]
    [DefaultValue("ZZZ")]
    [ChoFallbackValue("XXX")]
    public string Name { get; set; }
 
    public bool AfterRecordLoad(object target, int index, object source)
    {
        throw new NotImplementedException();
    }
 
    public bool BeforeRecordLoad(object target, int index, ref object source)
    {
        throw new NotImplementedException();
    }
 
    public bool RecordLoadError(object target, int index, object source, Exception ex)
    {
        throw new NotImplementedException();
    }
} 

public partial class EmployeeRec
{
    public int Id { get; set; }    
    public string Name { get; set; }
}

10.3. BeginLoad

此回调在 Parquet 文件加载 **开始** 时调用一次。source 是 Parquet 文件流对象。在这里,您可以检查 stream,返回 true 以继续加载 Parquet。返回 false 以停止解析。

列表 10.1.1 BeginLoad 回调示例

public bool BeginLoad(object source)
{
    StreamReader sr = source as StreamReader;
    return true;
}

10.4. EndLoad

此回调在 Parquet 文件加载 **结束** 时调用一次。source 是 Parquet 文件流对象。在这里,您可以检查流,执行任何需要对流执行的后续步骤。

列表 10.2.1 EndLoad 回调示例

public void EndLoad(object source)
{
    StreamReader sr = source as StreamReader;
}

10.5. BeforeRecordLoad

此回调在加载 Parquet 文件中的每个 Parquet 节点 **之前** 调用。target 是 POCO 记录对象的实例。index 是文件中的 JObject 节点索引。source 是 Parquet 记录对象。在这里,您可以检查对象,并在需要时用新值覆盖它。

提示:如果您想跳过 JObject 的加载,请将 source 设置为 null。

返回 true 继续加载过程,否则返回 false 停止过程。

列表 10.5.1 BeforeRecordLoad 回调示例

public bool BeforeRecordLoad(object target, int index, ref object source)
{
    IDictionary<string, object> obj = source as IDictionary<string, object>;
    return true;
}

10.6. AfterRecordLoad

此回调在加载 Parquet 文件中的每个 JObject 节点 **之后** 调用。target 是 POCO 记录对象的实例。index 是文件中的 JObject 节点索引。source 是 Parquet 记录对象。在这里,您可以对 JObject 行执行任何后续操作。

返回 true 继续加载过程,否则返回 false 停止过程。

列表 10.6.1 AfterRecordLoad 回调示例

public bool AfterRecordLoad(object target, int index, object source)
{
    IDictionary<string, object> obj = source as IDictionary<string, object>;
    return true;
}

10.7. RecordLoadError

在加载 JObject 节点时遇到 **错误** 时调用此回调。target 是 POCO 记录对象的实例。index 是文件中的 JObject 节点索引。sourceJObject 节点。ex 是异常对象。在这里,您可以处理异常。仅当 Configuration.ErrorMode 设置为 ReportAndContinue 时,才调用此方法。

返回 true 继续加载过程,否则返回 false 停止过程。

列表 10.7.1 RecordLoadError 回调示例

public bool RecordLoadError(object target, int index, object source, Exception ex)
{
    IDictionary<string, object> obj = source as IDictionary<string, object>;
    return true;
}

10.8. BeforeRecordFieldLoad

在加载每个 Parquet 记录字段 **之前** 调用此回调。target 是 POCO 记录对象的实例。index 是文件中的 JObject 节点索引。propName 是 Parquet 记录属性名称。value 是 Parquet 字段值。在这里,您可以检查 Parquet 记录属性值并执行任何自定义验证等。

返回 true 继续加载过程,否则返回 false 停止过程。

列表 10.8.1 BeforeRecordFieldLoad 回调示例

public bool BeforeRecordFieldLoad
       (object target, int index, string propName, ref object value)
{
    return true;
}

10.9. AfterRecordFieldLoad

在加载每个 Parquet 记录字段 **之后** 调用此回调。target 是 POCO 记录对象的实例。index 是文件中的 JObject 节点索引。propName 是 Parquet 记录属性名称。value 是 Parquet 字段值。在这里可以执行任何后续字段操作,例如计算其他属性、验证等。

返回 true 继续加载过程,否则返回 false 停止过程。

列表 10.9.1 AfterRecordFieldLoad 回调示例

public bool AfterRecordFieldLoad(object target, int index, string propName, object value)
{
    return true;
}

10.10. RecordLoadFieldError

在加载 Parquet 记录字段值时遇到 **错误** 时调用此回调。target 是 POCO 记录对象的实例。index 是文件中的 JObject 节点索引。propNameParquet 记录属性名称。valueParquet 字段值。ex 是异常对象。在这里,您可以处理异常。仅在 ChoParquetReader 执行以下两个步骤序列之后,才调用此方法。

  • ChoParquetReader 查找每个 Parquet 属性的 FallbackValue。如果存在,则尝试将其值赋给它。
  • 如果 FallbackValue 值不存在,并且 Configuration.ErrorMode 指定为 ReportAndContinue,则将执行此回调。

返回 true 继续加载过程,否则返回 false 停止过程。

列表 10.10.1 RecordFieldLoadError 回调示例

public bool RecordFieldLoadError
       (object target, int index, string propName, object value, Exception ex)
{
    return true;
}

10.11. SkipUntil

在 Parquet 解析开始时调用此回调,并提供自定义逻辑以跳过节点。index 是文件中的 JObject 节点索引。

返回 true 以跳过行,否则返回 false

列表 10.11.1 SkipUntil 回调示例

public bool SkipUntil(long index, object source)
{
    return false;
}

10.12. DoWhile

在 Parquet 解析开始时调用此回调,并提供自定义逻辑以跳过节点。index 是文件中的 JObject 节点索引。

返回 true 以停止解析,否则返回 false

列表 10.12.1 DoWhile 回调示例

public bool DoWhile(long index, object source)
{
    return false;
}

10. 定制

ChoParquetReader 自动检测并加载从 POCO 实体配置的设置。在运行时,您可以在 Parquet 解析之前自定义和调整这些参数。ChoParquetReader 公开了 Configuration 属性,它是 ChoParquetRecordConfiguration 对象。使用此属性,您可以自定义它们。

列表 10.1 运行时自定义 ChoParquetReader

class Program
{
    static void Main(string[] args)
    {
        using (var parser = new ChoParquetReader<EmployeeRec>("emp.parquet"))
        {
            object row = null;
  
            parser.Configuration.ColumnCountStrict = true;
            while ((row = parser.Read()) != null)
                Console.WriteLine(row.ToString());
        }
    }

11. AsDataReader 辅助方法

ChoParquetReader 公开了 AsDataReader 辅助方法,用于在 .NET datareader 对象中检索 Parquet 记录。DataReader 是快速向前的数据流。此 datareader 可用于某些地方,如使用 SqlBulkCopy 将数据批量复制到数据库,加载断开连接的 DataTable 等。

列表 11.1 读取为 DataReader 的示例

static void AsDataReaderTest()
{
    using (var parser = new ChoParquetReader<EmployeeRec>("emp.parquet"))
    {
        IDataReader dr = parser.AsDataReader();
        while (dr.Read())
        {
            Console.WriteLine("Id: {0}, Name: {1}", dr[0], dr[1]);
        }
    }
}

12. AsDataTable 辅助方法

ChoParquetReader 公开了 AsDataTable 辅助方法,用于在 .NET DataTable 对象中检索 Parquet 记录。然后,它可以持久化到磁盘,显示在网格/控件中,或像任何其他对象一样存储在内存中。

列表 12.1 读取为 DataTable 的示例

static void AsDataTableTest()
{
    using (var parser = new ChoParquetReader<EmployeeRec>("emp.parquet"))
    {
        DataTable dt = parser.AsDataTable();
        foreach (DataRow dr in dt.Rows)
        {
            Console.WriteLine("Id: {0}, Name: {1}", dr[0], dr[1]);
        }
    }
}

13. 使用动态对象

到目前为止,本文解释了如何使用 POCO 对象来使用 ChoParquetReaderChoParquetReader 还支持不带 POCO 对象加载 Parquet 文件。它利用 .NET 的动态特性。下面的示例展示了如何在没有 POCO 对象的情况下读取 Parquet 流。

如果您有 Parquet 文件,您可以以最少/零配置解析和加载该文件。

下面的示例展示了这一点

列表 13.1 加载 Parquet 文件

class Program
{
    static void Main(string[] args)
    {
        dynamic row;
        using (var parser = new ChoParquetReader("emp.parquet"))
        {
            while ((row = parser.Read()) != null)
            {
                Console.WriteLine(row.Id);
            }
        }
    }
}

上面的示例会自动发现 Parquet 对象成员并解析文件。

您可以通过手动添加字段配置并将它们传递给 ChoParquetReader 进行文件解析来覆盖自动发现字段的默认行为。

示例展示了如何执行此操作。

列表 13.3 使用配置加载 Parquet 文件

class Program
{
    static void Main(string[] args)
    {
        ChoParquetRecordConfiguration config = new ChoParquetRecordConfiguration();
        config.ParquetRecordFieldConfigurations.Add
                      (new ChoParquetRecordFieldConfiguration("Id"));
        config.ParquetRecordFieldConfigurations.Add
                      (new ChoParquetRecordFieldConfiguration("Name"));

        dynamic row;
        using (var parser = new ChoParquetReader("emp.parquet", config))
        {
            while ((row = parser.Read()) != null)
            {
                Console.WriteLine(row.Name);
            }
        }
    }
}

要完全关闭自动字段发现,您需要将 ChoParquetRecordConfiguration.AutoDiscoverColumns 设置为 false

13.1. DefaultValue

当 Parquet 值为空或为空白时(通过 IgnoreFieldValueMode 控制),用于并设置到属性的值。

可以使用 System.ComponentModel.DefaultValueAttribute 为任何 POCO 实体属性指定默认值。

对于动态对象成员或覆盖声明式 POCO 对象成员的默认值规范,您可以通过配置来实现,如下所示:

ChoParquetRecordConfiguration config = new ChoParquetRecordConfiguration();
config.ParquetRecordFieldConfigurations.Add
       (new ChoParquetRecordFieldConfiguration("Id"));
config.ParquetRecordFieldConfigurations.Add
       (new ChoParquetRecordFieldConfiguration("Name") { DefaultValue = "NoName" })

13.2. ChoFallbackValue

Parquet 值未能设置时,用于并设置到属性的值。Fallback 值仅在 ErrorModeIgnoreAndContinueReportAndContinue 时设置。

可以使用 ChoETL.ChoFallbackValueAttribute 为任何 POCO 实体属性指定回退值。

对于动态对象成员或覆盖声明式 POCO 对象成员的回退值,您可以通过配置来实现,如下所示:

ChoParquetRecordConfiguration config = new ChoParquetRecordConfiguration();
config.ParquetRecordFieldConfigurations.Add
       (new ChoParquetRecordFieldConfiguration("Id"));
config.ParquetRecordFieldConfigurations.Add
       (new ChoParquetRecordFieldConfiguration("Name") { FallbackValue = "Tom" });

13.3. FieldType

在无类型动态对象模型中,读取器读取各个字段值并将它们填充到“string”值的动态对象成员中。如果您想强制类型并在加载过程中进行额外的类型检查,可以通过在字段配置中声明字段类型来实现。

清单 8.5.1 定义 FieldType

ChoParquetRecordConfiguration config = new ChoParquetRecordConfiguration();
config.ParquetRecordFieldConfigurations.Add
       (new ChoParquetRecordFieldConfiguration("Id") { FieldType = typeof(int) });
config.ParquetRecordFieldConfigurations.Add
       (new ChoParquetRecordFieldConfiguration("Name"));

上面的示例展示了如何将“Id”字段的字段类型定义为“int”。这指示 ChoParquetReader 在将值分配给它之前解析并将其转换为整数。这种额外的类型安全可以避免在解析过程中加载到对象中的不正确值。

13.4. 类型转换器

ChoParquetReader 会自动转换大多数基本类型并将它们设置到属性中。如果 Parquet 字段的值无法自动转换为属性的类型,您可以指定自定义/内置 .NET 转换器来转换该值。这些可以是 IValueConverterTypeConverter 转换器。

在动态对象模型中,您可以通过配置指定这些转换器。请参阅下面的示例,了解为 Parquet 字段指定类型转换器的方法。

清单 13.4.1 指定 TypeConverters

ChoParquetRecordConfiguration config = new ChoParquetRecordConfiguration();

ChoParquetRecordFieldConfiguration idConfig = 
                     new ChoParquetRecordFieldConfiguration("Id");
idConfig.AddConverter(new IntConverter());
config.ParquetRecordFieldConfigurations.Add(idConfig);

config.ParquetRecordFieldConfigurations.Add
                    (new ChoParquetRecordFieldConfiguration("Name"));

在上面,我们使用 ChoParquetRecordFieldConfiguration 对象中的 AddConverter 帮助方法构造并附加 IntConverter 到“Id”字段。

同样,如果您想从 ChoParquetRecordFieldConfiguration 对象中删除任何转换器,可以使用 RemoveConverter

13.5. 验证

ChoParquetReader 利用 System.ComponentModel.DataAnnotationsValidation Block 验证属性来为各个 Parquet 字段指定验证规则。有关可用 DataAnnotations 验证属性的列表,请参阅 MSDN 站点。

清单 13.5.1 指定验证

ChoParquetRecordConfiguration config = new ChoParquetRecordConfiguration();

ChoParquetRecordFieldConfiguration idConfig = 
          new ChoParquetRecordFieldConfiguration("Id");
idConfig.Validators = new ValidationAttribute[] { new RangeAttribute(0, 100) };
config.ParquetRecordFieldConfigurations.Add(idConfig);

config.ParquetRecordFieldConfigurations.Add
       (new ChoParquetRecordFieldConfiguration("Name"));

在上面的示例中,我们为 Id 属性使用了 Range 验证属性。当 Configuration.ObjectValidationMode 设置为 ChoObjectValidationMode.MemberLevelChoObjectValidationMode.ObjectLevel 时,ChoParquetReader 会在加载时对其进行验证。

附注:动态对象模型不支持自验证。

14. 处理密封 POCO 对象

如果您已经拥有现有的密封 POCO 对象或对象来自第三方库,我们可以将其与 ChoParquetReader 一起使用。

清单 14.1 现有的已密封 POCO 对象

public sealed class ThirdPartyRec
{
    public int Id
    {
        get;
        set;
    }
    public string Name
    {
        get;
        set;
    }
}

列表 14.2 使用 Parquet 文件

class Program
{
    static void Main(string[] args)
    {
        using (var parser = new ChoParquetReader<ThirdPartyRec>("emp.parquet"))
        {
            object row = null;
 
            while ((row = parser.Read()) != null)
                Console.WriteLine(row.ToString());
        }
    }
}

在这种情况下,ChoParquetReader 会反向发现 Parquet 文件中的 Parquet 字段,并将数据加载到 POCO 对象中。如果 Parquet 文件结构和 POCO 对象匹配,加载将成功,并将所有相应的数据填充到其属性中。如果缺少任何 Parquet 字段的属性,ChoParquetReader 会默默地忽略它们并继续处理其余部分。

您可以通过将 ChoParquetRecordConfiguration.ThrowAndStopOnMissingField 属性设置为 false 来覆盖此行为。在这种情况下,如果缺少 Parquet 字段的属性,ChoParquetReader 将抛出 ChoMissingRecordFieldException 异常。

15. 异常

ChoParquetReader 在不同情况下会抛出不同类型的异常。

  • ChoParserException - Parquet 文件损坏,解析器无法恢复。
  • ChoRecordConfigurationException - 指定了任何无效的配置设置,将引发此异常。
  • ChoMissingRecordFieldException - 缺少 Parquet 字段的属性,将引发此异常。

17. 使用 MetadataType 注解

Cinchoo ETL 与数据注解的 MetadataType 模型配合得更好。这是一种将 MetaData 类附加到数据模型类的方法。在此关联类中,您可以提供数据模型中不存在的附加元数据信息。它的作用是在不修改类的情况下向类添加属性。您可以将这个接受单个参数的属性添加到将具有所有属性的类中。当 POCO 类由自动工具(通过 Entity Framework、MVC 等)自动生成时,这一点很有用。这就是第二个类的作用。您可以在不修改生成文件的情况下添加新内容。此外,这通过将关注点分离到多个类来促进模块化。

有关更多信息,请在 MSDN 中搜索。

清单 17.1 MetadataType 注解用法示例

[MetadataType(typeof(EmployeeRecMeta))]
public class EmployeeRec
{
    public int Id { get; set; }
    public string Name { get; set; }
}

[ChoParquetRecordObject]
public class EmployeeRecMeta : IChoNotifyRecordRead, IChoValidatable
{
    [ChoParquetRecordField(FieldName = "id", 
                           ErrorMode = ChoErrorMode.ReportAndContinue )]
    [ChoTypeConverter(typeof(IntConverter))]
    [Range(1, 1, ErrorMessage = "Id must be > 0.")]
    [ChoFallbackValue(1)]
    public int Id { get; set; }

    [ChoParquetRecordField(FieldName = "Name")]
    [StringLength(1)]
    [DefaultValue("ZZZ")]
    [ChoFallbackValue("XXX")]
    public string Name { get; set; }
 
    public bool AfterRecordLoad(object target, int index, object source)
    {
        throw new NotImplementedException();
    }
 
    public bool BeforeRecordLoad(object target, int index, ref object source)
    {
        throw new NotImplementedException();
    }
 
    public bool RecordLoadError(object target, int index, object source, Exception ex)
    {
        throw new NotImplementedException();
    }
 
    public bool TryValidate
           (object target, ICollection<ValidationResult> validationResults)
    {
        return true;
    }
 
    public bool TryValidateFor
    (object target, string memberName, ICollection<ValidationResult> validationResults)
    {
        return true;
    }
 
    public void Validate(object target)
    {
    }
 
    public void ValidateFor(object target, string memberName)
    {
    }
}

在上面,EmployeeRec 是数据类。仅包含领域特定的属性和操作。将其视为一个非常简单的类。

我们将验证、回调机制、配置等分离到元数据类型类 EmployeeRecMeta 中。

18. 配置选项

如果 POCO 实体类是自动生成的类、通过库公开的类或密封类,则它限制您以声明方式将 Parquet 架构定义附加到它。在这种情况下,您可以选择以下选项之一来指定 Parquet 布局配置:

  • 手动配置
  • 自动映射配置
  • 附加 MetadataType

我将向您展示如何为每种方法配置以下 POCO 实体类。

列表 18.1 密封 POCO 实体类

public sealed class EmployeeRec
{
    public int Id { get; set; }
    public string Name { get; set; }
}

18.1 手动配置

从头开始定义一个全新的配置对象,并将所有必要的 Parquet 字段添加到 ChoParquetConfiguration.ParquetRecordFieldConfigurations 集合属性。此选项为您提供了更大的灵活性来控制 Parquet 解析的配置。但缺点是可能会出错,并且如果 Parquet 文件布局很大,则难以管理。

清单 18.1.1 手动配置

ChoParquetRecordConfiguration config = new ChoParquetRecordConfiguration();
config.ParquetRecordFieldConfigurations.Add
              (new ChoParquetRecordFieldConfiguration("Id"));
config.ParquetRecordFieldConfigurations.Add
              (new ChoParquetRecordFieldConfiguration("Name"));

18.2 自动映射配置

这是一种替代方法,一种不太容易出错的方法,可以为 POCO 实体类自动映射 Parquet 字段。

首先,为 EmployeeRec POCO 实体类定义一个模式类,如下所示:

列表 18.2.1 Auto Map 类

public class EmployeeRecMap
{
    [ChoParquetRecordField(FieldName = "Id")]
    public int Id { get; set; }
 
    [ChoParquetRecordField(FieldName = "Name")]
    public string Name { get; set; } 
}

然后,您可以使用它通过 ChoParquetRecordConfiguration.MapRecordFields 方法自动映射 Parquet 字段。

列表 18.2.2 使用 Auto Map 配置

ChoParquetRecordConfiguration config = new ChoParquetRecordConfiguration();
config.MapRecordFields<EmployeeRecMap>();

foreach (var e in new ChoParquetReader<EmployeeRec>("emp.parquet", config)) 
    Console.WriteLine(e.ToString());

18.3 关联 MetadataType 类

这是另一种附加 MetadataType 类以用于 POCO 实体对象的方法。前一种方法仅处理 Parquet 字段的自动映射。其他配置属性,如属性转换器、解析器参数、默认/回退值等,则不考虑。

此模型通过定义 MetadataType 类并以声明方式指定 Parquet 配置参数来考虑所有内容。当您的 POCO 实体是密封的 **且非部分** 类时,这一点很有用。此外,这是配置 POCO 实体 Parquet 解析的一种有利且不太容易出错的方法。

清单 18.3.1 定义 MetadataType 类

[ChoParquetRecordObject]
public class EmployeeRecMeta : IChoNotifyRecordRead, IChoValidatable
{
    [ChoParquetRecordField
    (FieldName = "Id", ErrorMode = ChoErrorMode.ReportAndContinue )]
    [ChoTypeConverter(typeof(IntConverter))]
    [Range(1, 1, ErrorMessage = "Id must be > 0.")]
    public int Id { get; set; }

    [ChoParquetRecordField(FieldName = "Name")]
    [StringLength(1)]
    [DefaultValue("ZZZ")]
    [ChoFallbackValue("XXX")]
    public string Name { get; set; }
 
    public bool AfterRecordLoad(object target, int index, object source)
    {
        throw new NotImplementedException();
    }
 
    public bool BeforeRecordLoad(object target, int index, ref object source)
    {
        throw new NotImplementedException();
    }
 
    public bool RecordLoadError(object target, int index, object source, Exception ex)
    {
        throw new NotImplementedException();
    }
 
    public bool TryValidate
    (object target, ICollection<ValidationResult> validationResults)
    {
        return true;
    }
 
    public bool TryValidateFor
    (object target, string memberName, ICollection<ValidationResult> validationResults)
    {
        return true;
    }
 
    public void Validate(object target)
    {
    }
 
    public void ValidateFor(object target, string memberName)
    {
    }
}

列表 18.3.2 附加 MetadataType 类

//Attach metadata 
ChoMetadataObjectCache.Default.Attach<EmployeeRec>(new EmployeeRecMeta());

foreach (var e in new ChoParquetReader<EmployeeRec>("emp.parquet")) 
    Console.WriteLine(e.ToString()

19. LoadText 辅助方法

这是一个方便的辅助方法,用于将 Parquet 文本字符串 解析并加载到 object 中。

列表 19.1 使用 LoadText 方法

string txt = @"
[
    {
        "Id": 1,
        "Name": "Jeanette"
    },
    {
        "Id": 2,
        "Name": "Giavani"
    }
]";

foreach (var e in ChoParquetReader.LoadText(txt))
   Console.WriteLine(e.ToStringEx());

20. 高级主题

20.1 覆盖转换器格式规范

Cinchoo ETL 自动解析并将每个 Parquet 字段值无缝转换为相应的 Parquet 字段的基础数据类型。大多数基本的 .NET 类型都会自动处理,无需任何设置。

这是通过 ETL 系统中的两个关键设置实现的

  1. ChoParquetRecordConfiguration.CultureInfo - 代表特定文化的信息,包括文化的名称、书写系统和日历,以及访问特定于文化的对象的权限,这些对象提供有关常见操作(如格式化日期和排序字符串)的信息。默认为 'en-US'。
  2. ChoTypeConverterFormatSpec - 这是一个全局格式说明符类,包含所有内置 .NET 类型的格式说明。

在本节中,我将讨论如何根据解析需求更改每个 .NET 内在数据类型的默认格式规范。

ChoTypeConverterFormatSpec 是一个单例类,其实例通过 'Instance' static 成员公开。它是线程本地的,意味着每个线程都会有一个单独的实例副本。

每个内置类型都有两组格式说明成员,一组用于加载,另一组用于写入值,但 BooleanEnumDataTime 类型除外。这些类型只有一组成员用于加载和写入操作。

通过 ChoTypeConverterFormatSpec 指定每个内置数据类型的格式说明会影响整个系统,即通过设置 ChoTypeConverterFormatSpec.IntNumberStyle = NumberStyles.AllowParentheses,将影响 Parquet 对象的所有整数成员,允许使用括号。如果您想覆盖此行为并控制特定的 Parquet 数据成员以处理其独特的 Parquet 值解析,使其区别于全局系统设置,则可以通过在 Parquet 字段成员级别指定 TypeConverter 来实现。有关更多信息,请参阅 **第 13.4 节**。

NumberStyles(可选)用于从 Parquet 流加载值,而 Format 字符串用于将值写入 Parquet 流。

在本文中,我将简要介绍如何使用 NumberStyles 从流加载 Parquet 数据。这些值是可选的。它决定了解析 Parquet 文件时每种类型允许的样式。系统会自动确定如何从基础 Culture 解析和加载值。在特殊情况下,您可能希望覆盖并按所需方式设置样式,以成功加载文件。有关 NumberStyles 及其值的更多信息,请参阅 MSDN NumberStyles

清单 20.1.1 ChoTypeConverterFormatSpec 成员

public class ChoTypeConverterFormatSpec
{
    public static readonly ThreadLocal<ChoTypeConverterFormatSpec> Instance = 
    new ThreadLocal<ChoTypeConverterFormatSpec>(() => new ChoTypeConverterFormatSpec());
 
    public string DateTimeFormat { get; set; }
    public ChoBooleanFormatSpec BooleanFormat { get; set; }
    public ChoEnumFormatSpec EnumFormat { get; set; }
 
    public NumberStyles? CurrencyNumberStyle { get; set; }
    public string CurrencyFormat { get; set; }
 
    public NumberStyles? BigIntegerNumberStyle { get; set; }
    public string BigIntegerFormat { get; set; }
 
    public NumberStyles? ByteNumberStyle { get; set; }
    public string ByteFormat { get; set; }
 
    public NumberStyles? SByteNumberStyle { get; set; }
    public string SByteFormat { get; set; }
 
    public NumberStyles? DecimalNumberStyle { get; set; }
    public string DecimalFormat { get; set; }
 
    public NumberStyles? DoubleNumberStyle { get; set; }
    public string DoubleFormat { get; set; }
 
    public NumberStyles? FloatNumberStyle { get; set; }
    public string FloatFormat { get; set; }
 
    public string IntFormat { get; set; }
    public NumberStyles? IntNumberStyle { get; set; }
 
    public string UIntFormat { get; set; }
    public NumberStyles? UIntNumberStyle { get; set; }
 
    public NumberStyles? LongNumberStyle { get; set; }
    public string LongFormat { get; set; }
 
    public NumberStyles? ULongNumberStyle { get; set; }
    public string ULongFormat { get; set; }
 
    public NumberStyles? ShortNumberStyle { get; set; }
    public string ShortFormat { get; set; }
 
    public NumberStyles? UShortNumberStyle { get; set; }
    public string UShortFormat { get; set; }
}

下面的示例展示了如何使用 ChoParquetReader 加载包含“se-SE”(瑞典)文化特定数据的 Parquet 数据流。此外,输入源中的“EmployeeNo”值包含括号。为了成功加载,我们需要将 ChoTypeConverterFormatSpec.IntNumberStyle 设置为 NumberStyles.AllowParenthesis

清单 20.1.2 在代码中使用 ChoTypeConverterFormatSpec

static void UsingFormatSpecs()
{
    ChoParquetRecordConfiguration config = new ChoParquetRecordConfiguration();
    config.Culture = new System.Globalization.CultureInfo("se-SE");
    config.ParquetRecordFieldConfigurations.Add
    (new ChoParquetRecordFieldConfiguration("Id") { FieldType = typeof(int) });
    config.ParquetRecordFieldConfigurations.Add
    (new ChoParquetRecordFieldConfiguration("Name"));
    config.ParquetRecordFieldConfigurations.Add
    (new ChoParquetRecordFieldConfiguration("Salary") 
    { FieldType = typeof(ChoCurrency) });
    config.ParquetRecordFieldConfigurations.Add
    (new ChoParquetRecordFieldConfiguration("JoinedDate") 
    { FieldType = typeof(DateTime) });
    config.ParquetRecordFieldConfigurations.Add
    (new ChoParquetRecordFieldConfiguration("EmployeeNo") { FieldType = typeof(int) });
 
    ChoTypeConverterFormatSpec.Instance.IntNumberStyle = NumberStyles.AllowParentheses;
 
    using (var parser = new ChoParquetReader("emp.parquet", config))
    {
        object row = null;
 
        while ((row = parser.Read()) != null)
            Console.WriteLine(row.ToStringEx());
    }
}

20.2 货币支持

Cinchoo ETL 提供了 ChoCurrency 对象来读取和写入 Parquet 文件中的货币值。ChoCurrency 是一个包装类,用于以 decimal 类型保存货币值,并支持在 Parquet 加载期间将其以文本格式序列化。

清单 20.2.1 在动态模型中使用 Currency 成员

static void CurrencyDynamicTest()
{
    ChoParquetRecordConfiguration config = new ChoParquetRecordConfiguration();
    config.ParquetRecordFieldConfigurations.Add
           (new ChoParquetRecordFieldConfiguration("Id"));
    config.ParquetRecordFieldConfigurations.Add
           (new ChoParquetRecordFieldConfiguration("Name"));
    config.ParquetRecordFieldConfigurations.Add
           (new ChoParquetRecordFieldConfiguration("Salary") 
           { FieldType = typeof(ChoCurrency) });
 
    using (var parser = new ChoParquetReader("emp.parquet", config))
    {
        object rec;
        while ((rec = parser.Read()) != null)
        {
            Console.WriteLine(rec.ToStringEx());
        }
    }
}

上面的示例展示了如何使用动态对象模型加载货币值。默认情况下,动态对象的所有成员都视为 string 类型,除非通过 ChoParquetFieldConfiguration.FieldType 显式指定。通过将“SalaryParquet 字段的字段类型指定为 ChoCurrencyChoParquetReader 会将它们加载为货币对象。

附注: 货币值的格式由 ChoParquetReader 通过 ChoRecordConfiguration.Culture 和 ChoTypeConverterFormatSpec.CurrencyNumberStyle 确定。

下面的示例展示了如何在 POCO 实体类中使用 ChoCurrency Parquet 字段。

清单 20.2.2 在 POCO 模型中使用 Currency 成员

public class EmployeeRecWithCurrency
{
    public int Id { get; set; }
    public string Name { get; set; }
    public ChoCurrency Salary { get; set; }
}
 
static void CurrencyTest()
{
    using (var parser = new ChoParquetReader<EmployeeRecWithCurrency>("emp.parquet"))
    {
        object rec;
        while ((rec = parser.Read()) != null)
        {
            Console.WriteLine(rec.ToStringEx());
        }
    }
}

20.3 枚举支持

Cinchoo ETL 会隐式处理 Parquet 文件中 enum 字段值的解析。如果您想精细控制这些值的解析,可以全局指定它们,方法是使用 ChoTypeConverterFormatSpec.EnumFormat。默认值为 ChoEnumFormatSpec.Value

FYI,更改此值将影响整个系统。

可以使用三个可能的值:

  1. ChoEnumFormatSpec.Value - Enum 值用于解析。
  2. ChoEnumFormatSpec.Name - Enum 键名用于解析。
  3. ChoEnumFormatSpec.Description - 如果每个 enum 键都用 DescriptionAttribute 修饰,则使用其值进行解析。

清单 20.3.1 在解析期间指定 Enum 格式规范

public enum EmployeeType
{
    [Description("Full Time Employee")]
    Permanent = 0,
    [Description("Temporary Employee")]
    Temporary = 1,
    [Description("Contract Employee")]
    Contract = 2
}

static void EnumTest()
{
    ChoTypeConverterFormatSpec.Instance.EnumFormat = ChoEnumFormatSpec.Description;
 
    ChoParquetRecordConfiguration config = new ChoParquetRecordConfiguration();
    config.ParquetRecordFieldConfigurations.Add
    (new ChoParquetRecordFieldConfiguration("Id") { FieldType = typeof(int) });
    config.ParquetRecordFieldConfigurations.Add
    (new ChoParquetRecordFieldConfiguration("Name"));
    config.ParquetRecordFieldConfigurations.Add
    (new ChoParquetRecordFieldConfiguration("Salary") 
    { FieldType = typeof(ChoCurrency) });
    config.ParquetRecordFieldConfigurations.Add
    (new ChoParquetRecordFieldConfiguration("JoinedDate") 
    { FieldType = typeof(DateTime) });
    config.ParquetRecordFieldConfigurations.Add
    (new ChoParquetRecordFieldConfiguration("EmployeeType") 
    { FieldType = typeof(EmployeeType) });
 
    ChoTypeConverterFormatSpec.Instance.IntNumberStyle = NumberStyles.AllowParentheses;
 
    using (var parser = new ChoParquetReader("emp.parquet", config))
    {
        object row = null;
 
        while ((row = parser.Read()) != null)
            Console.WriteLine(row.ToStringEx());
    }
}

20.4 布尔值支持

Cinchoo ETL 会隐式处理 Parquet 文件中布尔 Parquet 字段值的解析。如果您想精细控制这些值的解析,可以全局指定它们,方法是使用 ChoTypeConverterFormatSpec.BooleanFormat。默认值为 ChoBooleanFormatSpec.ZeroOrOne

FYI,更改此值将影响整个系统。

有四种可能的值:

  1. ChoBooleanFormatSpec.ZeroOrOne - '0' 表示 false。'1' 表示 true
  2. ChoBooleanFormatSpec.YOrN - 'Y' 表示 true,'N' 表示 false
  3. ChoBooleanFormatSpec.TrueOrFalse - 'True' 表示 true,'False' 表示 false
  4. ChoBooleanFormatSpec.YesOrNo - 'Yes' 表示 true,'No' 表示 false

清单 20.4.1 在解析期间指定布尔值格式规范

static void BoolTest()
{
    ChoTypeConverterFormatSpec.Instance.BooleanFormat = ChoBooleanFormatSpec.ZeroOrOne;
 
    ChoParquetRecordConfiguration config = new ChoParquetRecordConfiguration();
    config.ParquetRecordFieldConfigurations.Add
    (new ChoParquetRecordFieldConfiguration("Id") { FieldType = typeof(int) });
    config.ParquetRecordFieldConfigurations.Add
    (new ChoParquetRecordFieldConfiguration("Name"));
    config.ParquetRecordFieldConfigurations.Add
    (new ChoParquetRecordFieldConfiguration("Salary") 
    { FieldType = typeof(ChoCurrency) });
    config.ParquetRecordFieldConfigurations.Add
    (new ChoParquetRecordFieldConfiguration("JoinedDate") 
    { FieldType = typeof(DateTime) });
    config.ParquetRecordFieldConfigurations.Add
    (new ChoParquetRecordFieldConfiguration("Active") { FieldType = typeof(bool) });
 
    ChoTypeConverterFormatSpec.Instance.IntNumberStyle = NumberStyles.AllowParentheses;
 
    using (var parser = new ChoParquetReader("emp.parquet", config))
    {
        object row = null;
 
        while ((row = parser.Read()) != null)
            Console.WriteLine(row.ToStringEx());
    }
}

20.5 DateTime 支持

Cinchoo ETL 使用系统文化或自定义设置的文化隐式处理 datetime Parquet 字段值的解析。如果您想精细控制这些值的解析,可以全局指定它们,方法是使用 ChoTypeConverterFormatSpec.DateTimeFormat 默认值为 'd'。

FYI,更改此值将影响整个系统。

您可以使用任何有效的 标准自定义 datetime .NET 格式说明符来解析文件中的 datetime Parquet 值。

清单 20.5.1 在解析期间指定 datetime 格式规范

static void DateTimeTest()
{
    ChoTypeConverterFormatSpec.Instance.DateTimeFormat = "MMM dd, yyyy";
 
    ChoParquetRecordConfiguration config = new ChoParquetRecordConfiguration();
    config.ParquetRecordFieldConfigurations.Add
    (new ChoParquetRecordFieldConfiguration("Id") { FieldType = typeof(int) });
    config.ParquetRecordFieldConfigurations.Add
    (new ChoParquetRecordFieldConfiguration("Name"));
    config.ParquetRecordFieldConfigurations.Add
    (new ChoParquetRecordFieldConfiguration("Salary") 
    { FieldType = typeof(ChoCurrency) });
    config.ParquetRecordFieldConfigurations.Add
    (new ChoParquetRecordFieldConfiguration("JoinedDate") 
    { FieldType = typeof(DateTime) });
    config.ParquetRecordFieldConfigurations.Add
    (new ChoParquetRecordFieldConfiguration("Active") { FieldType = typeof(bool) });
 
    ChoTypeConverterFormatSpec.Instance.IntNumberStyle = NumberStyles.AllowParentheses;
 
    using (var parser = new ChoParquetReader("emp.parquet", config))
    {
        object row = null;
 
        while ((row = parser.Read()) != null)
            Console.WriteLine(row.ToStringEx());
    }
}

上面的示例展示了如何解析自定义 datetime Parquet 值。

注意:由于 datetime 值包含 Parquet 分隔符,因此它被用双引号括起来以通过解析。

21. Fluent API

ChoParquetReader 通过 Fluent API 方法公开了一些常用的配置参数。这将使 Parquet 文件解析的编程更快捷。

21.1 WithFields

此 API 方法指定要考虑用于解析和加载的 Parquet 节点(属性或元素)列表。Parquet 节点中的其他字段将被丢弃。

foreach (var e in new ChoParquetReader<EmployeeRec>
    ("emp.parquet").WithFields("Id", "Name"))
    Console.WriteLine(e.ToString());

21.2 WithField

此 API 方法用于添加带有 ParquetPath、数据类型和其他参数的 Parquet 节点。此方法在动态对象模型中有帮助,通过指定每个单独的 Parquet 节点以及适当的 datatype

foreach (var e in new ChoParquetReader<EmployeeRec>("emp.parquet").WithField
    ("Id", fieldType: typeof(int)))
    Console.WriteLine(e.ToString());

21.3 ColumnCountStrict

此 API 方法用于设置 ChoParquetWriter,以便在读取 Parquet 文件之前检查字段数量。

foreach (var e in new ChoParquetReader<EmployeeRec>("emp.parquet").ColumnCountStrict())
    Console.WriteLine(e.ToString());

21.4. NotifyAfter

此 API 方法用于定义在生成通知事件之前要处理的行数。此属性专为说明 Parquet 加载进度的用户界面组件而设计。通知会发送给订阅了 RowsLoaded 事件的订阅者。

static void NotifyAfterTest()
{
    using (var parser = new ChoParquetReader("emp.parquet")
        .NotifyAfter(1000)
        )
    {
        parser.RowsLoaded += (o, e) => Console.WriteLine(e.RowsLoaded); 

        foreach (var rec in parser)
        {
            Console.WriteLine(String.Format("Id: {0}", rec.Id));
            Console.WriteLine(String.Format("Name: {0}", rec.Name));
            Console.WriteLine(String.Format("Salary: {0}", rec.Salary));
        }
    }
}

21.5. Configure

此 API 方法用于配置未通过 Fluent API 公开的所有配置参数。

static void ConfigureTest()
{
    using (var parser = new ChoParquetReader("emp.parquet")
        .Configure(c => c.ErrorMode = ChoErrorMode.ThrowAndStop)
        )
    {
        foreach (var rec in parser)
        {
            Console.WriteLine(String.Format("Id: {0}", rec.Id));
            Console.WriteLine(String.Format("Name: {0}", rec.Name));
            Console.WriteLine(String.Format("Salary: {0}", rec.Salary));
        }
    }
}

21.6. Setup

此 API 方法用于通过 Fluent API 设置读取器的参数/事件。

static void SetupTest()
{
    using (var parser = new ChoParquetReader("emp.parquet")
        .Setup(r => r.BeforeRecordLoad += (o, e) =>
        {
            if (e.Source.CastTo<JObject>().ContainsKey("Name1"))
                e.Skip = true;
        }
        )
    {
        foreach (var rec in parser)
        {
            Console.WriteLine(String.Format("Id: {0}", rec.Id));
            Console.WriteLine(String.Format("Name: {0}", rec.Name));
            Console.WriteLine(String.Format("Salary: {0}", rec.Salary));
        }
    }
}

22. FAQ

22.1. 如何将字符串反序列化为枚举?

ChoParquetReader 会隐式处理 enum 文本到 enum 值的转换。下面的示例展示了如何使用 POCO 对象加载 Parquet

public enum Gender { Male, Female }
public class Employee
{
    public int Age { get; set; }
    public Gender Gender { get; set; }
}

static void EnumTest()
{
    using (var r = new ChoParquetReader<Employee>("emp.parquet"))
    {
        foreach (var rec in r)
            Console.WriteLine(rec.Dump());
    }
}

下面的示例展示了如何使用动态对象模型解析 Parquet(包含 enum 值)。

static void DynamicEnumTest()
{
    using (var r = new ChoParquetReader<Employee>("emp.parquet")
        .WithField("Age")
        .WithField("Gender", fieldType: typeof(Gender))
        )
    {
        foreach (var rec in r)
            Console.WriteLine(rec.Dump());
    }
}

22.2. 将 Parquet 反序列化为动态对象?

ChoParquetReader 在动态对象模型中隐式执行此操作。

static void DynamicEnumTest()
{
    using (dynamic r = new ChoParquetReader("emp.parquet")
        .WithField("Age")
        .WithField("Gender", fieldType: typeof(Gender))
        )
    {
        foreach (var rec in r)
        {
            Console.WriteLine(rec.Age);
            Console.WriteLine(rec.Gender);
        }
    }
}

在上面,解析器加载 .parquet 文件,构造并返回 dynamic 对象。

22.3. 如何将 Parquet 转换为 XML?

Cinchoo ETL 提供了 ChoXmlWriter,用于从对象生成 XML 文件。通过 ChoParquetReaderChoXmlWriter,您可以轻松地将 Parquet 转换为 XML 格式。

static void Parquet2XmlTest()
{
    StringBuilder xml = new StringBuilder();
    using (var r = new ChoParquetReader("emp.parquet"))
    {
        using (var w = new ChoXmlWriter(xml)
            .WithRootName("Emps")
            .WithNodeName("Emp")
            )
            w.Write(r);
    }
    Console.WriteLine(xml.ToString());
}

输出

<Emps>
  <Emp>
    <Id>1</Id>
    <Name>Mark</Name>
  </Emp>
  <Emp>
    <Id>2</Id>
    <Name>Tom</Name>
  </Emp>
</Emps>

22.4. 如何将 Parquet 转换为 CSV?

Cinchoo ETL 提供了 ChoCSVWriter,用于从对象生成 CSV 文件。通过 ChoParquetReaderChoCSVWriter,您可以轻松地将 Parquet 转换为 CSV 格式。

static void Parquet2CSVTest()
{
    StringBuilder csv= new StringBuilder();
    using (var r = new ChoParquetReader("emp.parquet"))
    {
        using (var w = new ChoCSVWriter(csv).WithFirstLineHeader())
            w.Write(r);
    }
    Console.WriteLine(csv.ToString());
}

输出

Id, Name
1, Tom
2, Mark

22.4. 如何将 Parquet 转换为 JSON?

Cinchoo ETL 提供了 ChoJSONWriter,用于从对象生成 JSON 文件。通过 ChoParquetReaderChoJSONWriter,您可以轻松地将 Parquet 转换为 JSON 格式。

static void Parquet2JSONTest()
{
    StringBuilder json = new StringBuilder();
    using (var r = new ChoParquetReader("emp.parquet"))
    {
        using (var w = new ChoJSONWriter(json))
            w.Write(r);
    }
    Console.WriteLine(json.ToString());
}

输出

[
  {
    "Id" : 1,
    "Name" : "Tom"
  },
  {
  "Id" : 2,
  "Name" : "Mark"
  }
]

22.6. 如何加载 Parquet 中的选定节点?

使用 WithField() Fluent API 方法,您可以指定要从 Parquet 源加载的选定字段。

static void SelectiveFieldTest()
{
    using (dynamic r = new ChoParquetReader("emp.parquet")
        .WithField("Age")
        .WithField("Gender", fieldType: typeof(Gender))
        )
    {
        foreach (var rec in r)
        {
            Console.WriteLine(rec.Age);
            Console.WriteLine(rec.Gender);
        }
    }
}

22.7. 如何将 Parquet 转换为 DataTable?

ChoParquetReader 提供了一个小辅助方法,用于将 Parquet 转换为 Datatable,即 AsDataTable()

static void ConvertToDataTableTest()
{
    using (var r = new ChoParquetReader<UserInfo>("emp.parquet"))
    {
        var dt = r.AsDataTable();
    }
}

22.8. 如何将 Parquet 转换为 DataReader?

ChoParquetReader 提供了一个小辅助方法,用于将 Parquet 转换为 DataReader,即 AsDataReader()

static void ConvertToDataTableTest()
{
    using (var r = new ChoParquetReader<UserInfo>("emp.parquet"))
    {
        var dr = r.AsDataReader();
    }
}

22.9. 如何反序列化对象?

此示例将 Parquet 反序列化为对象。

public class Account
{
    public string Email { get; set; }
    public bool Active { get; set; }
    public DateTime CreatedDate { get; set; }
    public IList<string> Roles { get; set; }
}
static void DeserializeObject()
{
    Account account = ChoParquetReader.Deserialize<Account>
                      ("emp.parquet").FirstOrDefault();

    Console.WriteLine(account.Email);
}

22.10. 如何反序列化集合?

此示例将 Parquet 反序列化为集合。

static void DeserializeCollection()
{
    List<EmployeeRec> emps = 
         ChoParquetReader.Deserialize<EmployeeRec>("emp.parquet").ToList();
}

22.11. 如何反序列化字典?

此示例将 Parquet 反序列化为 Dictionary

static void DeserializeDictionary()
{
    Dictionary<string, object> htmlAttributes = 
    ChoParquetReader.Deserialize<Dictionary<string, oject>>
    ("emp.parquet").FirstOrDefault();

    Console.WriteLine(htmlAttributes["Key"]);
    Console.WriteLine(htmlAttributes["Value"]);
}

22.12. 如何从文件中反序列化?

此示例将 Parquet 反序列化为 Dictionary

public class Movie
{
    public string Name { get; set; }
    public int Year { get; set; }
}

static void DeserializeFromFile()
{
    Movie movie1 = ChoParquetReader.Deserialize<Movie>("movie.parquet").FirstOrDefault();
}

22.13. 如何使用自定义工厂反序列化?

此示例使用自定义工厂反序列化 Parquet,以实例化 Person 类型的 Employee 实例。

public class Person
{
    public string FirstName { get; set; }
    public string LastName { get; set; }
    public DateTime BirthDate { get; set; }
}

public class Employee : Person
{
    public string Department { get; set; }
    public string JobTitle { get; set; }
}

static void CustomCreationTest()
{
    ChoActivator.Factory = (type, args) =>
    {
        if (type == typeof(Person))
            return new Employee();
        else
            return null;
    };
    Person person = ChoParquetReader.Deserialize<Person>("emp.parquet").FirstOrDefault();
    Console.WriteLine(person.GetType().Name);
}

22.14. 在使用 ChoParquetReader 进行序列化时,如何指定自定义的 DateTime 格式?

ChoParquetReader 可以使用当前系统文化自动转换 datetime 值。如果 Parquet 包含自定义 datetime 格式的值,您可以设置自定义 datetime 格式以成功解析 Parquet

具有自定义 datetime 格式值的示例 Parquet

{
  'Department': 'Furniture',
  'JobTitle': 'Carpenter',
  'FirstName': 'John',
  'LastName': 'Joinery',
  'BirthDate': '30-12-2003'
}

定义 POCO 类如下以处理自定义 datetime 格式:

public class Employee 
{
    public string Department { get; set; }
    public string JobTitle { get; set; }
    [DisplayFormat(DataFormatString = "dd-MM-yyyy")]
    public DateTime BirthDate { get; set; }
}

public class Employee 
{
    [ChoParquetRecordField]
    public string Department { get; set; }
    [ChoParquetRecordField]
    public string JobTitle { get; set; }
    [ChoParquetRecordField(FormatText = "dd-MM-yyyy")]
    public DateTime BirthDate { get; set; }
}

使用解析器按如下方式加载 Parquet

using (var r = new ChoParquetReader<Employee>("emp.parquet"))
{
    foreach (var rec in r)
        Console.WriteLine(rec.Dump());
}

在动态模型中,您可以按如下方式设置自定义 datetime 格式:

using (var r = new ChoParquetReader("emp.parquet")
    .WithField("Department")
    .WithField("JobTitle")
    .WithField("BirthDate", fieldType: typeof(DateTime), formatText: "dd-MM-yyyy")
    )
{
    foreach (var rec in r)
        Console.WriteLine(rec.Dump());
}

历史

  • 2020 年 6 月 4 日:初始版本
© . All rights reserved.