C#: 以固定批次/块的形式读取大型 CSV 或任何字符分隔值文件






4.27/5 (5投票s)
逐块读取大型 CSV 或任何字符分隔值文件作为 DataTable 和 Entity List。
背景
设想这样一种场景:我们需要在数据库中处理或存储一个大型字符分隔值文件的内容。我有一个大型.CSV文件,包含900万到1200万行,文件大小约为700-800 MB。最初,我尝试了GenericParser
、CsvHelper
以及其他一些方法,但最终要么出现内存不足,要么解决方案非常缓慢。然后我自行编写了一个解析器,将数据分块为DataTable
。几天后,我还需要验证源数据,因此我需要对解析器进行一些修改,以便将数据分块为List<TSource>
。
今天,我将分享两种解决方案:
- 以实体映射的形式读取数据
- 以
DataTable
的形式读取数据
难道没有其他好的库吗?
是的,有一些库。其中大多数将整个文件加载到列表/数据表中,少数库会分块数据,但没有验证功能。如果我们打算在数据处理过程中将数据验证错误和错误行详细信息记录到单独的文件/数据库中,我们将无法做到这一点,或者需要维护两个不同的进程。
我们为什么需要这个?
- 使用
SqlBulkCopy
将大型数据文件内容以DataTable
的形式上传到数据库 - 使用Entity Framework将大型数据文件内容以实体列表
List<TSource>
的形式上传到数据库 - 验证大型文件内容
在这里,我们将更多地关注数据分批或分块处理。
良好实践!!!
- 在这些场景下,请使用
yield return
和IEnumerable<TSource>
。 - 在插入数据库时,我们应该为整个文件维护一个截断(事务)。
- 对于Entity Framework,一次提交小列表并在需要时使用现有连接和事务定期重新初始化DB上下文是一个好习惯。
- 如果存在任何文件处理和错误日志,请进行维护。处理完后,将已处理的文件移至存档位置。
这里我添加了CSV示例,但它也适用于任何字符分隔值文件。
核心
以下是一些将在我们的助手项目中使用的接口。
using System.Collections.Generic;
namespace CsvBatch
{
public interface ICsvLine
{
long LineNo { get; }
string Line { get; }
char Splitter { get; }
List<string> LineValues();
void Set(long lineNo, string line, char splitter);
}
public interface ICsvDataMapper : ICsvLine
{
}
public interface ICsvDataMapper<T> : ICsvDataMapper
{
T Map();
bool Map(out T entity, out List<string> errors);
}
}
此类表示文件的数据行。
using System.Collections.Generic;
namespace CsvBatch
{
public class CsvLine : ICsvLine
{
public long LineNo { get; protected set; }
public string Line { get; protected set; }
public char Splitter { get; protected set; }
public List<string> LineValues()
{
var values = string.IsNullOrEmpty(Line) ?
new List<string>() : new List<string>(Line.Split(Splitter));
return values;
}
public void Set(long lineNo, string line, char splitter)
{
LineNo = lineNo;
Line = line;
Splitter = splitter;
}
}
}
此类表示文件。
using System;
using System.Collections.Generic;
using System.Data;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
namespace CsvBatch
{
public class FileReader
{
public readonly string Path;
public readonly char Splitter;
public readonly bool HasHeaderLine;
private List<string> _header;
private FileReader(char splitter, bool hasHeaderLine)
{
Splitter = splitter;
HasHeaderLine = hasHeaderLine;
}
public FileReader(string path, bool hasHeaderLine, char splitter) :
this(splitter, hasHeaderLine)
{
Path = path;
}
public virtual List<string> Headers()
{
if (_header != null)
{
return _header;
}
if (!HasHeaderLine)
{
_header = new List<string>();
return _header;
}
_header = new List<string>();
using (var reader = new StreamReader(File.OpenRead(Path)))
{
if (HasHeaderLine && !reader.EndOfStream)
{
_header = reader.ReadLine().Split(Splitter).ToList();
}
}
return _header;
}
public virtual IEnumerable<string> Lines()
{
using (var reader = new StreamReader(File.OpenRead(Path)))
{
/*skip header rows*/
if (HasHeaderLine && !reader.EndOfStream)
{
reader.ReadLine(); /*check header: string[] headers =
reader.ReadLine().Split(Splitter);*/
}
/*data rows*/
while (!reader.EndOfStream)
{
var line = reader.ReadLine();
yield return line;
}
}
}
public virtual IEnumerable<CsvLine> Rows()
{
using (var reader = new StreamReader(File.OpenRead(Path)))
{
long lineNo = 0;
/*skip header rows*/
if (HasHeaderLine && !reader.EndOfStream)
{
++lineNo;
reader.ReadLine(); /*check header: string[] headers =
reader.ReadLine().Split(Splitter);*/
}
/*data rows*/
while (!reader.EndOfStream)
{
var line = new CsvLine();
line.Set(++lineNo, reader.ReadLine(), Splitter);
yield return line;
}
}
}
public IEnumerable<List<CsvLine>> Rows(int batchSize)
{
List<CsvLine> list = new List<CsvLine>();
foreach (var row in Rows())
{
list.Add(row);
if (list.Count == batchSize)
{
yield return list;
list = new List<CsvLine>();
}
}
if (list.Count > 0)
{
yield return list;
}
}
}
}
这是一个用于实体映射的映射基类。我们将使用此配置类将数据行转换为预期的实体。在这里,我们可以选择在获取映射实体之前验证数据行。
using System;
using System.Collections.Generic;
using System.Linq;
namespace CsvBatch
{
public abstract class EntityLineMapper<TEntilty> : CsvLine, ICsvDataMapper<TEntilty>
{
public TEntilty Map()
{
TEntilty entity;
List<string> mapErrors;
if (!Map(out entity, out mapErrors))
{
throw new Exception("File to entity map error.");
}
return entity;
}
public bool Map(out TEntilty entity, out List<string> mapErrors)
{
entity = Map(LineValues(), out mapErrors);
return !mapErrors.Any();
}
protected abstract TEntilty Map(List<string> lineValues, out List<string> mapErrors);
}
}
以实体形式读取
我们将使用此类以实体列表的形式读取数据。此类继承自之前共享的FileReader
。
using System;
using System.Collections.Generic;
using System.Data;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
namespace CsvBatch
{
public class ReaderAsEntity : FileReader
{
public ReaderAsEntity(string path, bool hasHeaderLine = true,
char splitter = ',') : base(path, hasHeaderLine, splitter)
{
}
public IEnumerable<List<TEntity>> Rows<TMapper, TEntity>(int batchSize)
where TMapper : ICsvDataMapper<TEntity>, new()
{
TMapper mapper = new TMapper();
foreach (var batch in Rows(batchSize))
{
List<TEntity> batchQueue = new List<TEntity>(batchSize);
foreach (var row in batch)
{
mapper.Set(row.LineNo, row.Line, row.Splitter);
batchQueue.Add(mapper.Map());
}
yield return batchQueue;
}
}
public IEnumerable<List<TMapper>> Rows<TMapper>(int batchSize)
where TMapper : ICsvDataMapper, new()
{
foreach (var batch in Rows(batchSize))
{
List<TMapper> batchQueue = new List<TMapper>(batchSize);
foreach (var row in batch)
{
TMapper mapper = new TMapper();
mapper.Set(row.LineNo, row.Line, row.Splitter);
batchQueue.Add(mapper);
}
yield return batchQueue;
}
}
}
}
以DataTable形式读取
此类将用于以DataTable
列表的形式读取数据。
using System;
using System.Collections.Generic;
using System.Data;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace CsvBatch
{
public class ReaderAsDataTable : FileReader
{
public int? ExtectedColumnNumber { get; protected set; }
public bool IncludeLineNumber { get; protected set; }
public string TableName { get; protected set; }
public string DefaultColumnName { get; protected set; }
public string DefaultLineNumberColumnName { get; protected set; }
public List<string> _expectedHeaders;
private ReaderAsDataTable(string path, bool hasHeaderLine,
char splitter, string tableName, string defaultColumnName,
bool includeLineNumber, string defaultLineNumberColumnName)
: base(path, hasHeaderLine, splitter)
{
IncludeLineNumber = includeLineNumber;
TableName = tableName;
DefaultColumnName = defaultColumnName;
DefaultLineNumberColumnName = defaultLineNumberColumnName;
}
public ReaderAsDataTable(string path, char splitter = ',',
string tableName = "TableName", string defaultColumnName = "ColumnName",
bool includeLineNumber = true,
string defaultLineNumberColumnName = "ColumnRowNumber")
: this(path, true, splitter, tableName, defaultColumnName,
includeLineNumber, defaultLineNumberColumnName)
{
}
public ReaderAsDataTable(string path, int extectedColumnNumber,
char splitter = ',', string tableName = "TableName",
string defaultColumnName = "ColumnName",
bool includeLineNumber = true,
string defaultLineNumberColumnName = "ColumnRowNumber")
: this(path, false, splitter, tableName, defaultColumnName,
includeLineNumber, defaultLineNumberColumnName)
{
ExtectedColumnNumber = extectedColumnNumber;
}
public List<string> ExpectedHeaders()
{
if (_expectedHeaders != null)
{
return _expectedHeaders;
}
List<string> headers = new List<string>();
if (ExtectedColumnNumber != null)
{
for (int i = 1; i <= ExtectedColumnNumber; i++)
{
string columnName = DefaultColumnName + i;
headers.Add(columnName);
}
}
else
{
headers.AddRange(Headers());
}
if (IncludeLineNumber)
{
string columnName = DefaultLineNumberColumnName;
headers.Add(columnName);
}
_expectedHeaders = headers;
return _expectedHeaders;
}
public DataTable DataTable()
{
DataTable table = NewDataTable();
table.BeginLoadData();
foreach (var item in Rows())
{
var row = NewDataRow(table, item);
table.Rows.Add(row);
}
table.EndLoadData();
return table;
}
public IEnumerable<DataTable> DataTables(int batchSize)
{
int tableCount = 0;
foreach (var batch in Rows(batchSize))
{
++tableCount;
DataTable table = NewDataTable(tableCount);
table.BeginLoadData();
foreach (var item in batch)
{
var row = NewDataRow(table, item);
table.Rows.Add(row);
}
table.EndLoadData();
yield return table;
}
}
private DataTable NewDataTable(int tableCount = 0)
{
DataTable table = new DataTable();
table.TableName = tableCount == 0 ? TableName : TableName + tableCount;
foreach (string header in ExpectedHeaders())
{
string columnName = header;
if (table.Columns[columnName] == null)
{
table.Columns.Add(columnName);
}
}
return table;
}
private DataRow NewDataRow(DataTable table, CsvLine item)
{
DataRow row = table.NewRow();
var lineValues = item.LineValues();
if (IncludeLineNumber)
{
lineValues.Add(item.LineNo.ToString());
}
row.ItemArray = lineValues.ToArray();
return row;
}
}
}
使用:以实体形式读取
实体模型和数据映射
模型
要以实体形式读取数据,我们需要一个模型和一个模型映射配置。这是我们的模型类
namespace CsvBatch
{
public class Student
{
public string Name { get; internal set; }
public int Age { get; internal set; }
public int Level { get; internal set; }
}
}
数据映射
让我们通过继承EntityLineMapper<TSource>
类来创建模型映射配置类。基类将强制子类实现一个方法TSource Map(List<string> lineValues, out List<string> mapErrors)
。
List<string> lineValues
包含拆分后的行值List<string> mapErrors
错误验证消息
在这里,我正在验证数据并将错误消息添加到List<string> mapErrors.
。如果数据源始终提供有效数据,则这不是必需的。修剪值是一个好习惯,如果需要,我们可以在这里执行。
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace CsvBatch
{
public class StudentMapper : EntityLineMapper<Student>
{
protected override Student Map(List<string> lineValues, out List<string> mapErrors)
{
Student entity = new Student();
mapErrors = new List<string>();
/*column number validation*/
if (lineValues.Count < 3)
{
mapErrors.Add("Not enough enough");
return entity;
}
/*name column validation*/
entity.Name = lineValues[0];
/*age column validation*/
string ageString = lineValues[1];
int age;
if (int.TryParse(ageString, out age))
{
entity.Age = age;
}
else
{
mapErrors.Add("Age is not a number");
}
/*level column validation*/
string levelString = lineValues[2];
int level;
if (int.TryParse(levelString, out level))
{
entity.Level = level;
}
else
{
mapErrors.Add("Level is not a number");
}
return entity;
}
}
}
无验证读取文件
在此,源文件的所有行都有效。
文件示例
Name,Age,Level
Dan,8,1
Ben,1,1
Henrik,9,1
Dan,8,2
Ben,1,2
Henrik,9,2
Dan,8,3
加载数据
string path = FullPath(@"Student.csv");
bool hasHeader = true; /*use false, if no header in the file*/
char separator = ',';
int batchSize = 3;
IEnumerable<List<Student>> enumerable =
new ReaderAsEntity(path, hasHeader, separator).Rows<StudentMapper, Student>(batchSize);
List<List<Student>> list = enumerable.ToList();
Assert.AreEqual(3, list.Count);
Assert.AreEqual(3, list[0].Count);
Assert.AreEqual(3, list[1].Count);
Assert.AreEqual(1, list[2].Count);
带验证读取文件
在此,只有源文件的最后一行是有效的。
文件示例
Name,Age,Level
Ben,,1
Henrik,1,
Dan,8,2
加载数据
用法几乎与之前相同,但该函数将输出验证错误消息。
Student entity
行到实体模型List<string> errors
验证错误消息,默认情况下,其计数应为零
string path = FullPath(@"Student_Validation.csv");
bool hasHeader = true; /*use false, if no header in the file*/
char separator = ',';
int batchSize = 10;
var reader = new ReaderAsEntity(path, hasHeader, separator);
Student entity; List<string> errors;
foreach (var list in reader.Rows<StudentMapper>(batchSize))
{
foreach (var item in list)
{
if (item.Map(out entity, out errors))
{
/*no validation error in the line*/
/*use entity*/
}
else
{
/*validation error in the line*/
/*check errors*/
}
}
}
使用:以DataTable形式读取
目前,在以DataTable
形式读取文件数据时,我们没有像实体映射那样的验证选项。我将很快添加验证选项。
所有列的数据类型都将是string
。将添加一个名为ColumnRowNumber
的附加列到DataTable
,以确定数据行号,这可以在构造函数中配置。如果我们查看ReaderAsDataTable
类的构造函数,我们将看到一些默认参数。请根据需要切换构造函数或向默认参数传递值。
带标题读取文件
文件示例
Name,Age,Level
Dan,8,1
Ben,1,1
Henrik,9,1
Dan,8,2
Ben,1,2
Henrik,9,2
Dan,8,3
加载数据
string path = FullPath(@"Student.csv");
int batchSize = 3; /*3 row each datatable*/
char separator = ',';
List<DataTable> dataTables =
new ReaderAsDataTable(path, separator).DataTables(batchSize).ToList();
Assert.AreEqual(3, dataTables.Count);
无标题读取文件
文件示例
Dan,8,1
Ben,1,1
Henrik,9,1
Dan,8,2
Ben,1,2
Henrik,9,2
Dan,8,3
加载数据
string path = FullPath(@"Student_No_Header.csv");
int batchSize = 3; /*3 row each datatable*/
char separator = ',';
int numberOfColumnsInFile = 3;
List<DataTable> dataTables = new ReaderAsDataTable
(path, numberOfColumnsInFile, separator).DataTables(batchSize).ToList();
Assert.AreEqual(3, dataTables.Count);
由于文件中没有标题,列名将自动生成,例如
ColumnName1, ColumnName2, ColumnName3 ...
奖金:将IEnumerable/List拆分为批次
此扩展方法会将任何IEnumerable<TSource>
对象拆分为大小均匀的块。
IEnumerable 助手
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace CsvBatch
{
public static class ListExtensions
{
public static IEnumerable<List<TSource>> Batch<TSource>
(this IEnumerable<TSource> sourceList, int size)
{
List<TSource> batchList = new List<TSource>(size);
foreach (TSource obj in sourceList)
{
batchList.Add(obj);
if (batchList.Count == size)
{
yield return batchList;
batchList = new List<TSource>();
}
}
if (batchList.Count > 0)
{
yield return batchList;
}
}
}
}
使用助手
List<Student> source = new List<Student>()
{
/*b1*/
new Student() { Name = "Dan", Age = 10, Level = 1},
new Student() { Name = "Ben", Age = 11, Level = 1},
/*b2*/
new Student() { Name = "Dan", Age = 10, Level = 2},
new Student() { Name = "Ben", Age = 11, Level = 2},
/*b3*/
new Student() { Name = "Dan", Age = 10, Level = 3},
};
IEnumerable<List<Student>> value = source.Batch(2);
List<List<Student>> list = new List<List<Student>>(value);
Assert.AreEqual(3, list.Count);
Assert.AreEqual(2, list[0].Count);
Assert.AreEqual(2, list[1].Count);
Assert.AreEqual(1, list[2].Count);
项目
这是一个Visual Studio 2017解决方案,一个单元测试项目。文件夹结构如下:
- Model (学生模型和映射器)
- Helper (读取器类或实际助手代码)
- Test (代码示例、单元测试和测试文件)
该代码对于未经测试的输入可能会抛出意外错误。如果有,请告诉我。
历史
- 2020年8月2日:初始版本