DataTable 同步管理器






4.63/5 (9投票s)
2006年3月4日
5分钟阅读

74569

1455
为 DataTable 事务日志添加同步功能。
引言
在我的上一篇文章中,我讨论了DataTableTransactionLog
。本文通过实现一个 DataTable
同步管理器进一步发展了这些概念。同步管理器负责获取事务记录并创建记录“包”列表。它还负责获取记录“包”列表并重构事务记录。然后,这些事务记录可以应用于目标 DataTable
,从而同步各个 DataTable
。
重构
对上一篇文章的读者来说,需要注意一些重构。除了命名空间更改为“Clifton.Data
”之外,其他更改如下:
DataTableTransactionLog.cs:
- 将
Apply
和Revert
记录操作移至DataTableTransactionLog
类。 - 将
RestoreRowFields
和SaveRowFields
移至DataTableTransactionLog
类。 - 添加了索引器以返回事务记录。
- 修改了
Deleting
事件,以搜索所有以前的事务,并将当前行数据保存在引用即将删除的行的任何事务记录中。 - 日志现在利用内部
DataView
,以便在同步期间通过主键查找记录。
DataTableTransactionRecord.cs:
TransType
属性重命名为TransactionType
。TransactionType
枚举重命名为RecordType
。- 添加了一个虚拟
Apply
方法,该方法实现了以前在DataTableTransactionLog
类中的逻辑。 - 添加了一个非虚拟
Revert
方法,该方法实现了以前在DataTableTransactionLog
类中的逻辑。 columnValues
已公开(ColumnValues
)并且可设置。
架构
有一个特定的架构驱动应用程序与同步管理器的接口。应用程序要求是:
- 确保每个
DataTable
实现相同的列名。 - 利用一个或多个主键唯一标识每行。
- 在从同步管理器获取事务包之前,确保主键值已初始化且有效。
- 在将事务包应用于目标
DataTable
之前,确保主键值已初始化且有效。 - 主键由
DataTable
的PrimaryKey
数组确定。
以下是说明日志与同步管理器之间关系的 UML 图:
接下来将讨论与 DataTable
同步相关的类。
TransactionRecordPacket
在上一篇文章中,DataTableTransactionRecord
跟踪对 DataRow
实例的更改。由于每个 DataTable
将具有不同的 DataRow
实例,因此有必要使用不同的机制来同步唯一的 DataTable
实例。我选择的机制是利用主键列。如果您的 DataTable
没有主键列,则必须添加一个,或修改您的查询以从持久存储中获取主键列。
TransactionRecordPacket
将主键值存储在字典中,映射主键列名和主键值。如果事务是列更改,则还存储更改字段的列名和新值。不保留旧值,如果行被删除,也不保留整个行的字段值。
实现
实现的关键部分是对 GetGuaranteedRowValue
的调用,无论行是否已删除,它都获取指定的字段值:
public TransactionRecordPacket(DataTableTransactionRecord record)
{
pkValues = new Dictionary<string, object>();
tranType = record.TransactionType;
foreach (DataColumn dc in record.Row.Table.PrimaryKey)
{
pkValues.Add(dc.ColumnName, record.GetGuaranteedRowValue(dc.ColumnName));
}
// Fill in some additional information if the transaction is a ChangeField.
if (tranType == DataTableTransactionRecord.RecordType.ChangeField)
{
columnName = record.ColumnName;
newValue = record.NewValue;
}
}
这意味着即使对于已删除的行,我们也可以获取主键字段值,以便我们可以在正在同步的 DataTable
中找到该行并将其删除。GetGuaranteedRowValue
方法的内部实现是:
public object GetGuaranteedRowValue(string fieldName)
{
object ret = null;
if (WasDeleted)
{
ret = columnValues[fieldName];
}
else
{
if (row.RowState == DataRowState.Deleted)
{
throw new DataTableTransactionException(
"Row has been deleted and there is no saved column values.");
}
ret = row[fieldName];
}
return ret;
}
如上所述,当记录被删除时,对于引用正在删除的行的所有事务记录,columnValues
属性都会填充。这是 OnRowDeleting
处理程序的重构:
protected void OnRowDeleting(object sender, DataRowChangeEventArgs e)
{
if (doLogging)
{
DataTableTransactionRecord record;
record = new DataTableTransactionRecord(transactions.Count, e.Row,
DataTableTransactionRecord.RecordType.DeleteRow);
record.SaveRowFields(e.Row);
OnTransactionAdding(new TransactionEventArgs(record));
transactions.Add(record);
Dictionary<string, object> colVals = record.ColumnValues;
// Tell all transaction records involving this row to save the row fields.
// This allows us to access deleted row data in earlier transactions.
// Alternatively, since the row is deleted, all transactions involving the
// deleted row could be removed. I'm not sure about this approach though--
// is it possible for transactions to affect other non-deleted data before
// the row deleted?
for (int i = 0; i < transactions.Count - 1; i++)
{
if (transactions[i].Row == e.Row)
{
transactions[i].ColumnValues = colVals;
}
}
OnTransactionAdded(new TransactionEventArgs(record));
}
}
我对此实现并不特别满意,因为它需要遍历事务列表。好吧,优化以后再做,对吧?
DataTablePKTransactionRecord
此派生自 DataTableTransactionRecord
并重写 Apply
方法。如上所述,必须使用行主键值而不是 DataRow
实例。
实现
Apply
方法的实现如下:
public override DataRow Apply(DataView dataView)
{
DataTable sourceTable=dataView.Table;
// If the transaction record contains a known DataRow, then use
// the default Apply method.
if (row != null)
{
base.Apply(dataView);
}
else
{
// We have to use the PK value information to determine the row.
switch (transType)
{
case RecordType.NewRow:
row = sourceTable.NewRow();
SetPKFieldValues();
sourceTable.Rows.Add(row);
break;
case RecordType.DeleteRow:
row = FindRow(dataView);
row.Delete();
break;
case RecordType.ChangeField:
row = FindRow(dataView);
row[columnName] = newValue;
break;
}
}
return null;
}
对 SetPKFieldValues
的调用:
protected void SetPKFieldValues()
{
foreach (KeyValuePair<string, object> kvp in pkFieldNameValues)
{
row[kvp.Key] = kvp.Value;
}
}
是必需的,以便可以在添加新行后找到它。这导致设置主键值的事务中存在一些冗余,尽管它们已经在这里设置。一个可能的重构是累积所有字段更改事务,并在添加新行时一次性应用它们。这将涉及一个可以访问所有事务记录的实现,而我在这里关注的是一次应用一个事务记录。这是验证基本要求实现的重要的一步。
FindRow
方法根据数据包的主键字段/值字典设置主键字段值:
protected DataRow FindRow(DataView dataView)
{
if ((dataView.Sort == null) || (dataView.Sort == String.Empty))
{
throw new DataTableSynchronizationManagerException("The transaction
logger's SetupSort method must be called before synchronization.");
}
object[] pks = new object[pkValues.Count];
pkValues.Values.CopyTo(pks, 0);
int idx = dataView.Find(pks);
if (idx < 0)
{
throw new DataTableSynchronizationManagerException("Could not find row
to update.");
}
return dataView[idx].Row;
}
DataTableSynchronizationManager
同步管理器与 DataTableTransactionLog
实例接口以获取事务并将其转换为 TransactionRecordPacket
实例。它还执行反向操作——获取 TransactionRecordPacket
实例集合并将其添加到日志记录器的事务集合中。
实现
GetTransactions
返回 TransactionRecordPacket
实例列表:
public List<TransactionRecordPacket> GetTransactions()
{
if (logger.SourceTable.PrimaryKey == null)
{
throw new DataTableTransactionException(
"GetTransactions requires at least one PK.");
}
List<TransactionRecordPacket> packets = new List<TransactionRecordPacket>();
foreach (DataTableTransactionRecord record in logger.Log)
{
TransactionRecordPacket trp = new TransactionRecordPacket(record);
packets.Add(trp);
}
return packets;
}
然后可以将这些应用于管理镜像 DataTable
的日志记录器的事务:
public void AddTransactions(List<TransactionRecordPacket> transactionPackets)
{
foreach (TransactionRecordPacket trp in transactionPackets)
{
logger.Log.Add(new DataTablePKTransactionRecord(trp));
}
}
请注意同步管理器如何添加专门的 DataTablePKTransactionRecord
类的实例。这是为了可以重写 Appy
方法,以便使用主键值而不是 DataRow
实例本身来查找行。
最后,同步管理器实现了两个方法:SetupSort
和 Sync
,其中后者用于应用添加到事务日志的所有事务。这些事务预计都是 DataTablePKTransactionRecord
实例:
public void Sync()
{
SetupSort();
logger.SuspendLogging();
foreach (DataTableTransactionRecord record in logger.Log)
{
if (!(record is DataTablePKTransactionRecord))
{
throw new DataTableSynchronizationManagerException("Expected a record
of type DataTablePKTransactionRecord.");
}
record.Apply(logger.DataView);
}
logger.ResumeLogging();
}
SetupSort
方法将设置 DataView.Sort
属性,以便可以使用 DataView.Find
方法根据主键查找行。为此,DataTable
的 Column
集合中的 DataColumn
实例必须以相同的顺序初始化:
protected void SetupSort()
{
if (logger.SourceTable.PrimaryKey == null)
{
throw new DataTableTransactionException(
"GetTransactions requires at least one PK.");
}
string sortBy = String.Empty;
string comma = String.Empty;
foreach (DataColumn dc in logger.SourceTable.PrimaryKey)
{
sortBy += comma + dc.ColumnName;
comma = ", ";
}
logger.DataView.Sort = sortBy;
}
单元测试
我已经为核心日志记录功能和同步管理器创建了单元测试。
事务日志记录器
事务日志记录器单元测试是一系列顺序单元测试,可以使用我的高级单元测试引擎运行。
using System;
using System.Collections.Generic;
using System.Data;
using Vts.UnitTest;
using Clifton.Data;
namespace TransactionLoggerUnitTests
{
[TestFixture]
[ProcessTest]
public class LoggerTests
{
protected DataTable dt;
protected DataTableTransactionLog dttl;
protected DataRow row;
[TestFixtureSetUp]
public void FixtureSetup()
{
dt = new DataTable();
dt.Columns.Add(new DataColumn("LastName", typeof(string)));
dt.Columns.Add(new DataColumn("FirstName", typeof(string)));
dttl = new DataTableTransactionLog();
dttl.SourceTable = dt;
}
[Test, Sequence(1)]
public void NewRow()
{
row = dt.NewRow();
Assertion.Assert(dttl.Log.Count == 1, "Expected one entry.");
Assertion.Assert(dttl.Log[0].TransactionType ==
DataTableTransactionRecord.RecordType.NewRow,
"Expected new row transaction.");
}
[Test, Sequence(2)]
public void SetFields()
{
row["LastName"] = "Clifton";
row["FirstName"] = "Marc";
dt.Rows.Add(row);
Assertion.Assert(dttl.Log.Count == 3, "Expected three entries.");
Assertion.Assert(dttl.Log[1].TransactionType ==
DataTableTransactionRecord.RecordType.ChangeField,
"Expected change field transaction.");
Assertion.Assert(dttl.Log[2].TransactionType ==
DataTableTransactionRecord.RecordType.ChangeField,
"Expected change field transaction.");
Assertion.Assert(dttl.Log[1].NewValue.ToString()=="Clifton",
"Incorrect new value.");
Assertion.Assert(dttl.Log[2].NewValue.ToString() == "Marc",
"Incorrect new value.");
}
[Test, Sequence(3)]
public void CollectNothing()
{
dttl.CollectUncommittedRows();
Assertion.Assert(dt.Rows.Count == 1, "Committed row was collected!");
}
[Test, Sequence(4)]
public void CollectUncommitted()
{
dt.NewRow();
dttl.CollectUncommittedRows();
Assertion.Assert(dttl.Log.Count == 3, "Expected three entries.");
}
[Test, Sequence(5)]
public void RevertFirstNameChange()
{
dttl.Revert(2);
Assertion.Assert(dt.Rows[0]["LastName"].ToString() ==
"Clifton", "Incorrect value.");
Assertion.Assert(dt.Rows[0]["FirstName"]==DBNull.Value,
"Incorrect new value.");
}
[Test, Sequence(6)]
public void RevertLastNameChange()
{
dttl.Revert(1);
Assertion.Assert(dt.Rows[0]["LastName"] == DBNull.Value,
"Incorrect value.");
Assertion.Assert(dt.Rows[0]["FirstName"] == DBNull.Value,
"Incorrect new value.");
}
[Test, Sequence(7)]
public void RevertNewRowChange()
{
dttl.Revert(0);
Assertion.Assert(dt.Rows.Count == 0, "Row should have been deleted.");
}
[Test, Sequence(8)]
public void ApplyNewRow()
{
dttl.Apply(0);
Assertion.Assert(dt.Rows.Count == 1, "Row was not added.");
Assertion.Assert(dt.Rows[0]["LastName"] == DBNull.Value,
"Incorrect value.");
Assertion.Assert(dt.Rows[0]["FirstName"] == DBNull.Value,
"Incorrect new value.");
}
[Test, Sequence(9)]
public void ApplyLastName()
{
dttl.Apply(1);
Assertion.Assert(dt.Rows.Count == 1, "Row was not added.");
Assertion.Assert(dt.Rows[0]["LastName"].ToString() == "Clifton",
"Incorrect value.");
Assertion.Assert(dt.Rows[0]["FirstName"] == DBNull.Value,
"Incorrect new value.");
}
[Test, Sequence(10)]
public void ApplyFirstName()
{
dttl.Apply(2);
Assertion.Assert(dt.Rows.Count == 1, "Row was not added.");
Assertion.Assert(dt.Rows[0]["LastName"].ToString() == "Clifton",
"Incorrect value.");
Assertion.Assert(dt.Rows[0]["FirstName"].ToString() == "Marc",
"Incorrect new value.");
}
}
}
同步管理器
同步管理器的单元测试说明了同步两个 DataTable
实例的基本过程:
using System;
using System.Collections.Generic;
using System.Data;
using Vts.UnitTest;
using Clifton.Data;
namespace TransactionSyncrhonizationManagerUnitTests
{
[TestFixture]
[ProcessTest]
public class CurrencyTests
{
protected DataTable dt1;
protected DataTableTransactionLog dttl1;
protected DataTable dt2;
protected DataTableTransactionLog dttl2;
[TestFixtureSetUp]
public void FixtureSetup()
{
dt1 = new DataTable();
dt1.Columns.Add(new DataColumn("PK", typeof(Guid)));
dt1.Columns.Add(new DataColumn("LastName", typeof(string)));
dt1.Columns.Add(new DataColumn("FirstName", typeof(string)));
dt1.PrimaryKey = new DataColumn[] { dt1.Columns["PK"] };
dttl1 = new DataTableTransactionLog(dt1);
dt2 = dt1.Clone();
dttl2 = new DataTableTransactionLog(dt2);
DataRow row=dt1.NewRow();
row["PK"]=Guid.NewGuid();
row["LastName"]="Clifton";
row["FirstName"]="Marc";
dt1.Rows.Add(row);
row=dt1.NewRow();
row["PK"]=Guid.NewGuid();
row["LastName"]="Linder";
row["FirstName"]="Karen";
dt1.Rows.Add(row);
row=dt1.NewRow();
row["PK"]=Guid.NewGuid();
row["LastName"]="Doe";
row["FirstName"]="John";
dt1.Rows.Add(row);
dt1.Rows[2].Delete();
}
[Test, Sequence(1)]
public void UpdateMirror()
{
DataTableSynchronizationManager dtcm1 =
new DataTableSynchronizationManager(dttl1);
List<TransactionRecordPacket> trpList = dtcm1.GetTransactions();
DataTableSynchronizationManager dtcm2 =
new DataTableSynchronizationManager(dttl2);
dtcm2.AddTransactions(trpList);
dtcm2.Sync();
Assertion.Assert(dt2.Rows.Count == 2, "Expected 2 rows.");
Assertion.Assert(dt2.Rows[0]["LastName"].ToString() == "Clifton",
"Unexpected value");
Assertion.Assert(dt2.Rows[0]["FirstName"].ToString() == "Marc",
"Unexpected value");
Assertion.Assert(dt2.Rows[1]["LastName"].ToString() == "Linder",
"Unexpected value");
Assertion.Assert(dt2.Rows[1]["FirstName"].ToString() == "Karen",
"Unexpected value");
}
}
}
您会注意到两个 DataTable
实例都设置了相同的列结构。从与第一个日志记录器接口的 DataTableSynchronizationManager
获取的事务包集合被传递给与第二个日志记录器接口的第二个 DataTableSynchronizationManager
实例。添加包并调用 Sync
方法。另请注意,由于两个表旨在同步,因此它们现在都实现了一个主键列。
结论
DataTableSynchronizationManager
是同步两个远程 DataTable
实例之间数据的有用组件。事务包列表适用于序列化,使用我的RawSerialization库,它将提供一种紧凑的格式,以便通过网络发送。