分块解析CSV文件并保存到数据库






4.90/5 (26投票s)
在本文中,我将演示如何按块(按行)读取大型 CSV 文件,将其填充到 DataTable 对象中,然后批量插入到数据库。
引言
在本文中,我将演示如何按块(1 块 = N 行)读取大型 csv 文件,将其填充到 System.Data.DataTable
对象中,然后批量插入到数据库。
我将详细解释我使用的 .NET 框架组件,以及遇到的挑战,例如内存管理、性能、大文件读/写等,以及如何解决这些问题。在撰写本文时,我的目的是与目前从事类似工作或不久将来的同行分享我的实际经验,以便他们能从本文中受益。
背景
有一天,客户提出了新的需求。需求如下:
- 客户将通过其 Web 应用程序上传非常大的 CSV 类型数据文件。
- 上传完成后,文件将存储在特定的服务器位置。
- 一个软件代理(Windows 服务、使用任务计划程序运行的控制台应用程序)将顺序解析这些文件,并将文件数据转储到指定的数据库。
- 数据文件的架构将是预定义的,并且该架构可以通过数据库进行配置。
- 客户将在开始上传之前配置数据文件架构。
- 将文件数据转储到数据库后,客户将从数据库生成各种报告。
上传文件的结构
我之前提到文件类型是 .csv。我们都知道 .csv 类型的文件应该有一个预定义的数据分隔符。在我们的案例中,它是“制表符”。这意味着它包含“制表符”分隔的数据。文件的第一行是其列定义(数据架构)。接下来的每一行代表一个行/记录,需要由任何软件代理/组件定期转储到数据库。
输入数据文件包含四个列
1. Code
2. YearlyExpense
3. LastName
4. FirstName
文件内容如下:
文件解析
首先,我设计了一个接口 IFileParser,它有两个方法。
- GetFileData
- WriteChunkData
public interface IFileParser
{
IEnumerable<DataTable> GetFileData(string sourceDirectory);
void WriteChunkData(DataTable table, string distinationTable,
IList<KeyValuePair<string, string>> mapList);
}
1. GetFileData:GetFileData
方法的职责是按块读取文件,填充 datatable(ADO.NET 对象),然后将其发送给客户端。
2. WriteChunkData:WriteChunkData
方法的职责是将 DataTable
数据插入到数据库。
需要明确的是
多行 = 1 块数据 = 1 DataTable.
因此,在读取数据文件后,最好在读取完一个块时,该方法返回一个单独的 DataTable
并将其返回给调用者。然后调用者将调用 WriteChunkData
方法,以便将数据转储到数据库。此过程将继续进行,直到到达文件末尾。将使用两个非常重要的 .NET 框架组件:
- IEnumerable<T>
- yield
接口方法
IEnumerable<DataTable> IFileParser.GetFileData(string sourceFileFullName)
{
bool firstLineOfChunk = true;
int chunkRowCount = 0;
DataTable chunkDataTable = null;
string columnData = null;
bool firstLineOfFile = true;
using (var sr = new StreamReader(sourceFileFullName))
{
string line = null;
//Read and display lines from the file until the end of the file is reached.
while ((line = sr.ReadLine()) != null)
{
//when reach first line it is column list need to create
//datatable based on that.
if (firstLineOfFile)
{
columnData = line;
firstLineOfFile = false;
continue;
}
if (firstLineOfChunk)
{
firstLineOfChunk = false;
chunkDataTable = CreateEmptyDataTable(columnData);
}
AddRow(chunkDataTable, line);
chunkRowCount++;
if (chunkRowCount == _chunkRowLimit)
{
firstLineOfChunk = true;
chunkRowCount = 0;
yield return chunkDataTable;
chunkDataTable = null;
}
}
}
//return last set of data which less then chunk size
if (null != chunkDataTable)
yield return chunkDataTable;
}
DataTable Create Without Data:
private DataTable CreateEmptyDataTable(string firstLine)
{
IList<string> columnList = Split(firstLine);
var dataTable = new DataTable("tblData");
dataTable.Columns.AddRange(columnList.Select(v => new DataColumn(v)).ToArray());
return dataTable;
}
DataTable 中添加的数据行:
private void AddRow(DataTable dataTable, string line) { DataRow newRow = dataTable.NewRow(); IList<string> fieldData = Split(line); for (int columnIndex = 0; columnIndex < dataTable.Columns.Count; columnIndex++) { newRow[columnIndex] = fieldData[columnIndex]; } dataTable.Rows.Add(newRow); }
制表符分隔数据拆分:
private IList<string> Split(string input)
{
//our csv file will be tab delimited
var dataList = new List<string>();
foreach (string column in input.Split('\t'))
{
dataList.Add(column);
}
return dataList;
}
为什么我使用 IEnumerable<T>
而不是 IList<T> 或 ICollection<T>。原因是 IList<T> 或 ICollection>T> 不支持惰性返回功能。另一件事是,在方法内部,我使用 yield 语句,并在 return 语句之前使用它来惰性地返回块数据。
如果计划在方法中使用 yield 语句,则该方法的返回类型必须是
要使用 yield return,必须遵守一些先决条件。如果您使用 IList<T>
而不是 IEnumerable<T>,您将收到编译错误:
'System.Collections.Generic.IList<System.Data.DataTable>' 不是迭代器接口类型
您可以在此 链接 中找到使用 yield 语句的详细先决条件。
yield 语句的主要优点是它支持惰性计算。当通过 yield 语句返回数据时,会记住当前代码的状态,下次调用迭代方法时,将执行剩余的语句。
另一个简单的例子:
public void ClientCode()
{
foreach (int i in GetValues())
{
Console.WriteLine(i.ToString());
}
}
public IEnumerable<int> GetValues()
{
yield return 1;
yield return 2;
yield return 3;
yield return 4;
}
该方法将返回 1 并将其打印到控制台,然后返回 2 并将其打印到控制台,然后返回 3 并将其打印到控制台。最后返回 4 并将其打印到控制台。这样,yield 语句就支持惰性计算的代码。
StreamReader
对象用于逐行读取数据文件。它有助于控制内存压力。它一次只从磁盘读取一行数据到内存。
在我们的数据文件中,数据文件的第一行是列列表,接下来的每一行数据都是一个行/记录。使用一些标志变量来跟踪状态,例如 First Line、First Chunk Line。
开始读取时,第一次解析块时,创建一个 DataTable
并开始填充数据。当达到 Maximum Chunk Size 时,方法将返回填充的 DataTable
给其调用者。
在利用惰性评估之前,需要清楚一个非常重要的概念。我们之前说过,在 yield 语句中使用 IEnumerable<T>
实际上可以实现惰性评估。但有时这可能会导致性能下降。
IEnumerable<DataTable> dataTables = fileParser.GetFileData(sourceFileFullName);
Console.WriteLine("Total data tables for first time:" + dataTables.Count());
Console.WriteLine("Total data tables for second time: " + dataTables.Count());
Console.WriteLine("Total data tables for third time: " + dataTables.Count());
即使您调用 dataTables.Count()
方法 3 次,但实际上它会调用 FileParser.GetFileData
方法 3 次。这种情况会造成性能损失。这些情况需要仔细处理,否则会给我们带来不利的后果。
批量插入
void IFileParser.WriteChunkData(DataTable table, string distinationTable,
IList<KeyValuePair<string, string>> mapList)
{
using (var bulkCopy = new SqlBulkCopy(_connectionString, SqlBulkCopyOptions.Default))
{
bulkCopy.BulkCopyTimeout = 0;//unlimited
bulkCopy.DestinationTableName = distinationTable;
foreach (KeyValuePair<string, string> map in mapList)
{
bulkCopy.ColumnMappings.Add(map.Key, map.Value);
}
bulkCopy.WriteToServer(table, DataRowState.Added);
}
}
SqlBulkCopy
对象非常高效,可以将大量数据从数据源写入 SQL Server 数据库。它有一个名为 ColumnMappings
的属性。需要在该属性中添加源和目标数据列信息的映射。需要注意的是,如果不添加映射信息,该对象会将目标列视为与源列相同的顺序。因此,最好添加映射信息,以便源和目标列相同且顺序相同。您应该了解一些 SqlBulkCopy
对象行为。它们是:
- 如果目标表中有未在源数据中提及的列,则会插入其默认值。
- 如果映射的目标列在目标表中不存在,则会引发
InvalidOperationException
,消息为“给定的ColumnMapping
与源或目标中的任何列都不匹配。” - 如果映射的源列在源 datatable 中不存在,则会引发
InvalidOperationException
,消息为“给定的列名 'FirstName2' 与数据源中的任何列都不匹配。” - 有趣的是,源列不区分大小写,而目标列区分大小写。这意味着,目标数据库表列名在映射信息中必须与原始列名大小写完全一致。否则,它会被视为不同的。
客户端代码
IFileParser fileParser = new DefaultFileParser();
string destinationTableName = "Data";
string sourceDirectory = @"D:\Data";
string sourceFileFullName = GetFileFullName(sourceDirectory);
IEnumerable<DataTable> dataTables = fileParser.GetFileData(sourceFileFullName);
foreach(DataTable tbl in dataTables)
{
fileParser.WriteChunkData(tbl, destinationTableName, Map());
GC.Collect();
}
Console.WriteLine("Completed successfully!");
GetFileData 方法返回 datatable 的列表。虽然它是 IEnumerable<DataTable>,所以在访问其任何属性时,其主体都会执行。所以您可能会犯一个错误:
try
{
IEnumerable<DataTable> dataTables = fileParser.GetFileData(sourceFileFullName);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
throw;
}
但是它永远不会抛出/捕获任何异常,因为 GetFileData 方法的代码不会执行。所以当您调试该方法时,请记住代码不会进入该方法,仅仅因为它返回的是带 yeild 语句的 IEnumerable<T>。当我通过 foreach 循环访问 datatables 时,它才会执行代码并逐个返回 DataTable。
我在每次迭代中使用 GC.Collect 方法。为什么需要它?嗯,虽然我在 foreach 中使用了 tbl 变量,但每次都有新的 datatable 分配给该变量,而前一个(在数据库写入之后)被重新分配为新的,数据将等待垃圾回收来释放内存。虽然数据量很大,但需要它,一旦完成,垃圾回收将尽快将其收集。所以我调用 GC.Collect 来强制收集垃圾数据。如果您想检查它的好处,
long totalMemoryBefore = System.Diagnostics.Process.GetCurrentProcess().PrivateMemorySize64;
fileParser.WriteChunkData(tbl, destinationTableName, Map());
GC.Collect();
long totalMemoryAfter = System.Diagnostics.Process.GetCurrentProcess().PrivateMemorySize64;
您可以使用各种计数器来评估调用 GC.Collect 后内存清理的效果。
编写代码时的注意事项
当您处理(文件解析并转储到数据库类型)这类需求时,需要遵循一些技术。这将帮助您轻松管理代码。
- 在开始处理之前,编写尽可能多的数据验证代码。
- 尽可能编写防御性编码。
- 永远不要信任从不同来源接收到的数据。
- 首先验证数据架构。然后处理数据。XML 类型的数据具有某种 XSD 的架构验证。但文本/CSV 文件没有这样的工具。您可以手动完成。
- 在发现不匹配的地方抛出带有详细错误消息的自定义异常,以便您能够理解问题发生在哪里以及是什么问题。
- 如果可能,记录自定义异常的详细错误数据和堆栈跟踪,包括发生错误的行号。
- 不要在运行时根据自己的决定自行更正任何数据,除非获得客户/领域专家的确认。即使是一个空格也是一个问题。您不知道空格在那里是否有特殊含义。
- 编写客户通知代码,如发送电子邮件/发送短信,以便在发生任何错误/异常情况时,客户能够尽快知晓。
关注点
在我的代码示例和演示中,我没有展示数据验证/异常相关的部分。我甚至没有展示任何防御性编程。您需要编写验证代码来验证数据。虽然数据来自 .csv 文件,但实际上它是一个文本文件,文本文件的数据始终是非结构化的,所以您需要编写额外的代码进行验证,并且由于制表符分隔,您需要特别注意空格/制表符/回车符/换行符字符。在将数据转储到数据库之前,您可能需要编写一些额外的代码进行数据清理。您可能需要从客户或领域专家那里就此做出决定。