通过一次只读处理将数据从单个XML流传输到多个表






4.75/5 (12投票s)
使用 SqlBulkCopy 的并行实现,从单个 XML 源快速地将数据传输到多个表中。
引言
数据传输是一个非常普遍的业务流程。大多数开发人员都熟悉寻找方法,从远程源获取数据并将其写入本地数据库。通常,可以将远程数据加载到内存中,然后使用某种数据适配器将其写入数据库。但是,如果您需要传输大量远程数据,该怎么办?数据量越大,您就越不愿意将其加载到内存中,而越倾向于使用某种仅向前读取的流式处理技术,以便在读取流时动态加载数据。幸运的是,ADO.NET 提供了一个有用的工具来实现这一点:SqlBulkCopy
类。
SqlBulkCopy
与单个数据库表相关联,该表通过 DestinationTable
属性进行标识。
SqlBulkCopy bulkCopy = new SqlBulkCopy(myConnection);
bulkCopy.DestinationTableName = "MyTable";
为了发挥其魔力,SqlBulkCopy
使用 WriteToServer
方法,该方法有四个重载。其中三个重载涉及 DataRow
和 DataTable
,这表明数据已经从某个地方提取并打包到 DataTable
中。由于在这种情况下我们关注的是传输大量数据,因此这些重载不如接受 IDataReader
的那个重载对我们更有吸引力。
void SqlBulkCopy.WriteToServer(IDataReader reader)
您需要创建自己的类来实现 System.Data.IDataReader
。要做到这一点,您需要了解 SqlBulkCopy.WriteToServer
读取您的 IDataReader
实现对象的相当直接的方式。
- 它会检查
IsClosed
标志,如果该标志设置为true
,则终止。 - 它会调用
Read()
方法并检查返回值。 - 如果
Read()
方法返回 true,它将使用属性访问器和GetValues()
方法将当前值写入DestinationTableName
指定的表中。 - 它会重复此过程直到终止。
因此,您有责任确保您的 IDataReader
对象在响应 Read()
调用时正确地遍历其数据源,并在数据源耗尽时设置 IsClosed
标志。
如果幸运的话,您的数据源对应于数据库中的单个表(例如,当数据发布到单个大型 CSV 文件时),您可以看到这如何使您能够使用仅向前读取来执行大量数据的非常快速的上载。只需创建一个实现 IDataReader
的类,并将您的数据源作为流读取器传递给该类。在每次调用 Read()
时,将流读取器前进到下一条记录,同时将其内容写入 this[int i]
、this[string name]
和 GetValues(object[] values)
访问的容器中。到目前为止一切顺利。网上有一些关于如何做到这一点的很好的演示:输入 SqlBulkCopy
到搜索引擎,您会立即找到几个。
但是,如果您的数据源不对应于单个表,该怎么办?那时怎么办?正如我在括号中提到的,您的数据提供商可以发布一个大型 CSV 文件,在这种情况下,您可以将一个读取器附加到您的自定义 IDataReader
对象,然后将其交给单个 SqlBulkCopy
对象,一切都会顺利进行。然而,尤其是在处理大型复杂数据存储库时,发布者以 XML 或 JSON 格式共享其数据的情况同样普遍。在这种情况下,您可能需要将数据写入多个表,理想情况下,您希望通过对数据源进行一次仅向前读取来完成此操作。
您无法通过单个 SqlBulkCopy
对象做到这一点。一旦设置了 DestinationTableName
属性,SqlBulkCopy
就只会查看该表,而不会查看其他表。奇怪的是,您可以在其原始分配后更改 DestinationTableName
属性,而 SqlBulkCopy
不会抱怨。它只会说“非常感谢”,然后像什么都没发生一样继续。无论如何,由于一个批量复制对象最多只能写入一个表,因此我们知道我们需要多个 SqlBulkCopy
对象。
如果您放宽“仅一次读取”的要求,生活会变得更简单。如果您要填充三个表,您可以连续三次读取相同的数据源,每次指向一个表。但是,我们能否使用 .NET 的并行库同时运行多个 SqlBulkCopy
对象,并实现一次仅向前读取填充多个表的目标?
是的,我们可以,这就是本文的主题。
示例
例如,我将使用一个我非常熟悉的数据源:我回家夜班公交车的时刻表。该时刻表由 伦敦交通局 以 TransXChange 格式发布。我不会提供一个完整的 TransXChange 格式的数据库模式,该模式将使您能够从本地数据库读取整个时刻表。这留给有奉献精神的读者作为练习。但是,为了说明目的,我将展示如何将我的示例时刻表文档中的数据流式传输到包含两个骨架表的示例数据库中。
在示例代码中,我创建并销毁了一个名为 SampleXmlData
的示例数据库。运行示例时,您只需要自定义连接字符串。
Data Source=<YOUR DB SERVER>;Initial Catalog=master;Integrated Security=SSPI
从 WinForms 的 TextBox
中。
设置和拆卸操作创建并销毁了示例数据库。执行传输后,传输应该会非常快,因为示例传输的数据量很小,请随时检查 TxcRoute 和 TxcRouteLink 表是否按预期填充。
策略
正如许多有趣的问题一样,如果将需求说清楚,解决方案的设计就会显现出来。我们需要从单个 XML 源 **读取** 数据,同时将其 **写入** 多个表。因此,我们创建了两个类:DatabaseTableWriter
和 XmlDataReader
。在代码中,一个单独的 XmlDataReader
对象读取 XML 数据源,而多个 DatabaseTableWriter
对象则使用其输出写入各自的数据库表。
DatabaseTableWriter
s 从 XmlDataReader
获取数据,XmlDataReader
在遍历传递给它的 XML 文件。解决方案的关键在于 XmlDataReader
的 walkThroughTheXml()
方法,在该方法中,它使用仅向前读取的 System.Xml.XmlReader
对象读取当前数据,并与 DatabaseTableWriter
s 共享结果。
private void walkThroughTheXml()
{
while (!xmlReader.EOF && !(xmlReader.ReadState == ReadState.Closed) && !cancelled)
{
WaitHandle.WaitAll(ResetEventWrapper.getWaitHandles(tableWriterResetEvents));
try
{
// Anything that refers to xmlReader is vulnerable.
// It can be tripped up by a poor connection.
// In order to allow the parallel loops
// to finish, we need the finally block
// to ensure that signalling is completed,
// even when the read() method falls over.
if (xmlReader.ReadState == ReadState.Closed)
break;
if (xmlReader.ReadState == ReadState.Error)
throw new Exception("The reader has encountered an internal error");
read();
}
catch (Exception ex)
{
// This will dispose of all the table writers,
// rendering their wait handles
// obsolete and allowing all threads to terminate.
Dispose();
throw ex;
}
finally
{
signalAllTableWriters();
}
if (cancelled)
xmlReader.Close();
}
}
DatabaseTableWriter
对象实现了 IDataReader
接口,并由它们自己的 SqlBulkCopy
对象控制,每个对象都在自己的线程上运行。如果我们向 N 个表写入数据,那么将有 N + 1 个线程并行运行:每个 SqlBulkCopy.WriteToServer()
实现一个线程,XmlDataReader
的 walkThroughTheXml()
方法一个线程。在 read()
方法中,遍历线程会确定它在 XML 树中的位置,如果它对应于其中一个 DatabaseTableWriter
对象,它会将 XML 节点传递给该对象,该对象知道其自己表的结构,并可以使用该节点来填充其内部值。
private void read()
{
bool tableElementIsPopped = false;
bool columnNameIsPopped = false;
populatedTableWriter = null;
do
{
pushElementStacks();
DatabaseTableWriter columnElementWriter;
if (readerIsAtTheStartOfAColumnElement(out columnElementWriter))
columnElementWriter.readColumnValue(xmlReader, columnNameStack);
else
xmlReader.Read();
popElementStacks(out tableElementIsPopped, out columnNameIsPopped);
if (tableElementIsPopped)
return;
}
while (!xmlReader.EOF && !(xmlReader.ReadState == ReadState.Closed));
}
需要注意的重要一点是,繁重的工作,即 XmlDataReader
维护其内部元素堆栈并通知相应的 DatabaseTableWriter
进行填充,所有这些都是在遍历线程上完成的。在 SqlBulkCopy.WriteToServer()
线程中,事情要容易得多。以下是 DatabaseTableWriter.Read()
线程中发生的情况。
public bool Read()
{
clearRowValues();
while (!IsClosed && !cancelled)
{
if (!WriterResetEvent.set())
return false;
if (!ReaderResetEvent.wait())
return false;
if (!cancelled && dataReader.canRead(this))
{
++rowCount;
return true;
}
}
return false;
}
在 WriterResetEvent.set()
方法中(这是我对 System.Threading.AutoResetEvent.Set()
方法的包装),线程向遍历线程发出信号,告知它已准备好读取其数据。然后,它等待遍历线程执行读取当前 XML 元素中的繁重工作。如果遍历刚刚读取了一个与此表完全匹配的节点,那么 dataReader
对象将返回 true,并且根据 SqlBulkCopy.WriteToServer()
的实现,写入线程将读取遍历线程生成的值并将其写入其数据库表。否则,Read()
方法返回 false,写入线程继续查找对应于其表的数据。
WaitHandle.WaitAll(ResetEventWrapper.getWaitHandles(tableWriterResetEvents))
引起警报,请不要过于惊慌。尽管我们正在等待 N 个线程中的每个线程的信号,但在这 N 个线程中只有一个将有任何数据可以发布到数据库,因为 xmlReader
对象一次最多只能位于一个节点的末尾。而且,鉴于我们之所以使用 SqlBulkCopy
是因为它速度快,我们可以预期一次写入操作执行得非常快。因此,即使我们直接从 Web 服务器读取流,我们也不会阻止对该服务器的访问。
所有这些都留下了一个问题:DatabaseTableWriter
对象如何知道它们正在写入的表的结构?当遍历线程询问它们时,它们如何知道该做什么?我将此逻辑封装在另一个 XML 文件(在示例中称为 txcXmlReader
)中,该文件为每个表指定指令:数据库表名是什么,它在数据源中对应于哪个 XML 节点或节点,在传输过程中通知调用者的频率,以及预期的行数。这种方法的一个巧妙之处在于,您可以随时从任何地方读取指令文件,因此您可能不必重新编译代码即可引入一组新表。此外,业务逻辑完全存在于指令文件中:DatabaseTableWriter
对象只是盲目地遵循这些指令并相应地填充其表。
如果您想调整此代码,那么您可以在这里开始发挥想象力。您可以扩展此指令元语言,以便指令文件可以指定源数据的来源以及您可能用于提取它的方法。如果您正在服务器上工作,您的指令文件可以指定一个计划,并且通过更多的编码,您可以达到可以在不更改任何代码甚至无需重新启动服务器的情况下引入新计划、执行新作业、加载不同数据的地方。
为了说明指令文件是如何读取的,让我们仔细看看 txcXmlReader
文件。它的内容如下:
<XmlParser>
<Table XmlTableName="RouteSection">
<Column XmlColumnName="RouteSection" ValueAttribute="id" ColumnType="System.String" />
<Table XmlTableName="RouteLink" DatabaseTableName="TxcRouteLink"
expectedRowCount="265" notifyAfter="10">
<Column XmlColumnName="RouteLink" DatabaseColumnName="RouteLinkID"
ValueAttribute="id" ColumnType="System.String" />
<Column XmlFormula="../RouteSection"
DatabaseColumnName="RouteSectionRef" ColumnType="System.String" />
<Column XmlColumnName="From.StopPointRef"
DatabaseColumnName="FromID" ColumnType="System.String" />
<Column XmlColumnName="To.StopPointRef"
DatabaseColumnName="ToID" ColumnType="System.String" />
<Column XmlColumnName="Direction" DatabaseColumnName="Direction"
ColumnType="System.String" Converters="FirstLetter,UpperCase" />
</Table>
</Table>
<Table XmlTableName="Route" DatabaseTableName="TxcRoute"
expectedRowCount="4" notifyAfter="1">
<Column XmlColumnName="Route" DatabaseColumnName="RouteID"
ValueAttribute="id" ColumnType="System.String" />
<Column XmlColumnName="PrivateCode"
DatabaseColumnName="PrivateCode" ColumnType="System.String" />
<Column XmlColumnName="Description"
DatabaseColumnName="Description" ColumnType="System.String" />
<Column XmlColumnName="RouteSectionRef"
DatabaseColumnName="RouteSectionRef" ColumnType="System.String" />
</Table>
</XmlParser>
关于这个文件,首先要注意的是,尽管这是一个相当平淡的观察,但它很短。它总是会很短:这是一组指令,而不是数据存储库。这意味着我们不必使用像 XmlReader
这样的笨重工具来读取它:我们可以使用支持 LINQ 的 XReader.Load()
方法将其加载到内存中,该方法使我们能够访问 LINQ to XML 的所有精妙技巧和语法技巧。
接下来要注意的是 <Table>
元素的嵌套结构。txcXmlReader
文件中有三个这样的元素,但只有两个数据库表需要填充:TxcRouteLink 和 TxcRoute。这些对应于 <Table>
元素中的 DatabaseTableName
属性。因此,解释器为每个 DatabaseTableName
属性生成一个 SqlBulkCopy
对象,而不是为每个 <Table>
元素生成一个。
虽然它们不都有 DatabaseTableName
属性,但 <Table>
元素确实都有 XmlTableName
属性。这表明每个 <Table>
元素对应于目标流中的 X 元素族,其中 X 是 <Table>
元素 XmlTableName
属性的值。
<Table>
元素的嵌套暗示了目标流的结构。如果一个 <Table XmlTableName="Y" ... />
节点嵌套在指令文件中的 <Table XmlTableName="X" ... />
节点内,那么可以合理地推断 <Y>
节点嵌套在目标流中的 <X>
节点内。
这可以解释示例中嵌套的 RouteLink
表中的 ../RouteSection
表示法。
<column xmlformula="../RouteSection"
databasecolumnname="RouteSectionRef" columntype="System.String" />
这还暗示了 DatabaseTableWriter
对象的一些行为。似乎确实有一个对应于指令文件中的每个 <Table>
元素,但并非所有这些都对应于 SqlBulkCopy
对象。有些只是用于存储数据,并响应对应于其 <Table>
元素子项的表写入器的查询。
如果 DatabaseTableWriter
对象具有与 <Table>
元素的结构相对应的父子结构,那么 DatabaseTableWriter
对象可以在其某些列数据已被遍历线程中的 XmlReader
读取后,查询其父对象。这可以通过查看遍历线程中的 popTableElementStack()
方法来确认,该方法在 XmlReader
完成读取表行元素的内容时被调用。
private void popTableElementStack(out bool tableElementIsPopped)
{
if (tableElementIsPopped = xmlReaderIsAtTheEndOfTheCurrentTableElement())
{
CurrentTableWriter.evaluateFormulaColumns();
populatedTableWriter = tableWriterStack.Pop();
}
}
遍历线程指示当前的 DatabaseTableWriter
对象评估其自己的公式列,该对象会照做。
public void evaluateFormulaColumns()
{
foreach (string formula in FormulaColumns.Keys)
rowValues[FormulaColumns[formula].ColumnIndex] =
evaluateFormula(FormulaColumns[formula]);
}
private object evaluateFormula(DatabaseColumn databaseColumn)
{
string path = databaseColumn.FormulaName;
List<string> pathComponents = StringUtils.splitAndTrim(path, '/').ToList();
DatabaseTableWriter currentTableWriter = this;
for (int i = 0; i < pathComponents.Count; ++i)
if (pathComponents[i].Equals(".."))
currentTableWriter = currentTableWriter.parentTableWriter;
else
return currentTableWriter.rowValues[currentTableWriter.XmlColumns[pathComponents[i]].ColumnIndex];
return null;
}
正如预期的那样,evaluateFormula
方法响应 ../
指令在 DatabaseTableWriter
树中向上遍历。如果您需要将其他类型的公式引入解释器的词汇中,可以在此处进行,但我建议不要在此处进行过于复杂的处理:请记住,我们正在处理限速的遍历线程,并且我们在此部分代码中保持精简和高效有切身利益。
回顾 txcXmlReader
文件,所有这些都让我们非常合理地推断,在 TransXChange 格式中,每个 <RouteLink>
节点都嵌套在 <RouteSection>
节点内,并且 TxcRouteLink 表不仅从 <RouteLink>
节点及其子节点读取其行数据,还从其父 <RouteSection>
节点读取。我可以确认这一点,但我也邀请您通过下载该项目并查看 Rob.DataTransfer 项目中的 SampleData/busTimeTable.xml 文件来验证这一点。
进一步调查
使用 SqlBulkCopy
类的一个限制是,您不能在同一连接上同时运行两个 SqlBulkCopy.WriteToServer()
方法:您必须在不同的连接上运行它们。如果您想在单个事务中运行上载,此限制将不幸地阻止您这样做。然而,我并不认为这是一个问题:通常,当您加载大量数据时,您会希望将其加载到暂存区域,并在过程成功完成并且您已彻底测试了新数据的完整性之后再进行切换。如果过程中出现问题(例如,您丢失了与远程源的连接),您可以简单地清理暂存区域并重新开始。同样,编写代码来实现暂存、验证和清理过程将留给有奉献精神的读者作为练习。
祝您编码愉快!
历史
2011/09/30
添加了关于指令文件如何被读取和由 DatabaseTableWriter
对象使用的详细描述,以及对 txcXmlReader
文件结构的详细检查。
2011/10/01
清理了代码文件,并为 WinForms 应用程序添加了异常处理和连接字符串持久化。