65.9K
CodeProject 正在变化。 阅读更多。
Home

DataTable 同步管理器

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.63/5 (9投票s)

2006年3月4日

5分钟阅读

viewsIcon

74569

downloadIcon

1455

为 DataTable 事务日志添加同步功能。

引言

在我的上一篇文章中,我讨论了DataTableTransactionLog。本文通过实现一个 DataTable 同步管理器进一步发展了这些概念。同步管理器负责获取事务记录并创建记录“包”列表。它还负责获取记录“包”列表并重构事务记录。然后,这些事务记录可以应用于目标 DataTable,从而同步各个 DataTable

重构

对上一篇文章的读者来说,需要注意一些重构。除了命名空间更改为“Clifton.Data”之外,其他更改如下:

DataTableTransactionLog.cs:

  • ApplyRevert 记录操作移至 DataTableTransactionLog 类。
  • RestoreRowFieldsSaveRowFields 移至 DataTableTransactionLog 类。
  • 添加了索引器以返回事务记录。
  • 修改了 Deleting 事件,以搜索所有以前的事务,并将当前行数据保存在引用即将删除的行的任何事务记录中。
  • 日志现在利用内部 DataView,以便在同步期间通过主键查找记录。

DataTableTransactionRecord.cs:

  • TransType 属性重命名为 TransactionType
  • TransactionType 枚举重命名为 RecordType
  • 添加了一个虚拟 Apply 方法,该方法实现了以前在 DataTableTransactionLog 类中的逻辑。
  • 添加了一个非虚拟 Revert 方法,该方法实现了以前在 DataTableTransactionLog 类中的逻辑。
  • columnValues 已公开(ColumnValues)并且可设置。

架构

有一个特定的架构驱动应用程序与同步管理器的接口。应用程序要求是:

  • 确保每个 DataTable 实现相同的列名。
  • 利用一个或多个主键唯一标识每行。
  • 在从同步管理器获取事务包之前,确保主键值已初始化且有效。
  • 在将事务包应用于目标 DataTable 之前,确保主键值已初始化且有效。
  • 主键由 DataTablePrimaryKey 数组确定。

以下是说明日志与同步管理器之间关系的 UML 图:

接下来将讨论与 DataTable 同步相关的类。

TransactionRecordPacket

在上一篇文章中,DataTableTransactionRecord 跟踪对 DataRow 实例的更改。由于每个 DataTable 将具有不同的 DataRow 实例,因此有必要使用不同的机制来同步唯一的 DataTable 实例。我选择的机制是利用主键列。如果您的 DataTable 没有主键列,则必须添加一个,或修改您的查询以从持久存储中获取主键列。

TransactionRecordPacket 将主键值存储在字典中,映射主键列名和主键值。如果事务是列更改,则还存储更改字段的列名和新值。不保留旧值,如果行被删除,也不保留整个行的字段值。

实现

实现的关键部分是对 GetGuaranteedRowValue 的调用,无论行是否已删除,它都获取指定的字段值:

public TransactionRecordPacket(DataTableTransactionRecord record)
{
  pkValues = new Dictionary<string, object>();
  tranType = record.TransactionType;

  foreach (DataColumn dc in record.Row.Table.PrimaryKey)
  {
    pkValues.Add(dc.ColumnName, record.GetGuaranteedRowValue(dc.ColumnName));
  }

  // Fill in some additional information if the transaction is a ChangeField.
  if (tranType == DataTableTransactionRecord.RecordType.ChangeField)
  {
    columnName = record.ColumnName;
    newValue = record.NewValue;
  }
}

这意味着即使对于已删除的行,我们也可以获取主键字段值,以便我们可以在正在同步的 DataTable 中找到该行并将其删除。GetGuaranteedRowValue 方法的内部实现是:

public object GetGuaranteedRowValue(string fieldName)
{
  object ret = null;

  if (WasDeleted)
  {
    ret = columnValues[fieldName];
  }
  else
  {
    if (row.RowState == DataRowState.Deleted)
    {
      throw new DataTableTransactionException(
          "Row has been deleted and there is no saved column values.");
    }

    ret = row[fieldName];
  }

  return ret;
}

如上所述,当记录被删除时,对于引用正在删除的行的所有事务记录,columnValues 属性都会填充。这是 OnRowDeleting 处理程序的重构:

protected void OnRowDeleting(object sender, DataRowChangeEventArgs e)
{
  if (doLogging)
  {
    DataTableTransactionRecord record;
    record = new DataTableTransactionRecord(transactions.Count, e.Row, 
         DataTableTransactionRecord.RecordType.DeleteRow);
    record.SaveRowFields(e.Row);
    OnTransactionAdding(new TransactionEventArgs(record));
    transactions.Add(record);
    Dictionary<string, object> colVals = record.ColumnValues;

    // Tell all transaction records involving this row to save the row fields.
    // This allows us to access deleted row data in earlier transactions.
    // Alternatively, since the row is deleted, all transactions involving the 
    // deleted row could be removed. I'm not sure about this approach though--
    // is it possible for transactions to affect other non-deleted data before
    // the row deleted?
    for (int i = 0; i < transactions.Count - 1; i++)
    {
      if (transactions[i].Row == e.Row)
      {
        transactions[i].ColumnValues = colVals;
      }
    }

    OnTransactionAdded(new TransactionEventArgs(record));
  }
}

我对此实现并不特别满意,因为它需要遍历事务列表。好吧,优化以后再做,对吧?

DataTablePKTransactionRecord

此派生自 DataTableTransactionRecord 并重写 Apply 方法。如上所述,必须使用行主键值而不是 DataRow 实例。

实现

Apply 方法的实现如下:

public override DataRow Apply(DataView dataView)
{
  DataTable sourceTable=dataView.Table;

  // If the transaction record contains a known DataRow, then use
  // the default Apply method.
  if (row != null)
  {
    base.Apply(dataView);
  }
  else
  {
    // We have to use the PK value information to determine the row.
    switch (transType)
    { 
      case RecordType.NewRow:
        row = sourceTable.NewRow();
        SetPKFieldValues();
        sourceTable.Rows.Add(row);
        break;

      case RecordType.DeleteRow:
        row = FindRow(dataView);
        row.Delete();
        break;

     case RecordType.ChangeField:
       row = FindRow(dataView);
       row[columnName] = newValue;
       break;
    }
  }

  return null;
}

SetPKFieldValues 的调用:

protected void SetPKFieldValues()
{
  foreach (KeyValuePair<string, object> kvp in pkFieldNameValues)
  {
    row[kvp.Key] = kvp.Value;
  }
}

是必需的,以便可以在添加新行后找到它。这导致设置主键值的事务中存在一些冗余,尽管它们已经在这里设置。一个可能的重构是累积所有字段更改事务,并在添加新行时一次性应用它们。这将涉及一个可以访问所有事务记录的实现,而我在这里关注的是一次应用一个事务记录。这是验证基本要求实现的重要的一步。

FindRow 方法根据数据包的主键字段/值字典设置主键字段值:

protected DataRow FindRow(DataView dataView)
{
  if ((dataView.Sort == null) || (dataView.Sort == String.Empty))
  {
    throw new DataTableSynchronizationManagerException("The transaction 
       logger's SetupSort method must be called before synchronization.");
  }

  object[] pks = new object[pkValues.Count];
  pkValues.Values.CopyTo(pks, 0);
  int idx = dataView.Find(pks);

  if (idx < 0)
  {
    throw new DataTableSynchronizationManagerException("Could not find row 
          to update.");
  }

  return dataView[idx].Row;
}

DataTableSynchronizationManager

同步管理器与 DataTableTransactionLog 实例接口以获取事务并将其转换为 TransactionRecordPacket 实例。它还执行反向操作——获取 TransactionRecordPacket 实例集合并将其添加到日志记录器的事务集合中。

实现

GetTransactions 返回 TransactionRecordPacket 实例列表:

public List<TransactionRecordPacket> GetTransactions()
{
  if (logger.SourceTable.PrimaryKey == null)
  {
    throw new DataTableTransactionException(
       "GetTransactions requires at least one PK.");
  }

  List<TransactionRecordPacket> packets = new List<TransactionRecordPacket>();

  foreach (DataTableTransactionRecord record in logger.Log)
  {
    TransactionRecordPacket trp = new TransactionRecordPacket(record);
    packets.Add(trp);
  }

  return packets;
}

然后可以将这些应用于管理镜像 DataTable 的日志记录器的事务:

public void AddTransactions(List<TransactionRecordPacket> transactionPackets)
{
  foreach (TransactionRecordPacket trp in transactionPackets)
  {
    logger.Log.Add(new DataTablePKTransactionRecord(trp));
  }
}

请注意同步管理器如何添加专门的 DataTablePKTransactionRecord 类的实例。这是为了可以重写 Appy 方法,以便使用主键值而不是 DataRow 实例本身来查找行。

最后,同步管理器实现了两个方法:SetupSortSync,其中后者用于应用添加到事务日志的所有事务。这些事务预计都是 DataTablePKTransactionRecord 实例:

public void Sync()
{
  SetupSort();
  logger.SuspendLogging();

  foreach (DataTableTransactionRecord record in logger.Log)
  {
    if (!(record is DataTablePKTransactionRecord))
    {
      throw new DataTableSynchronizationManagerException("Expected a record 
          of type DataTablePKTransactionRecord.");
    }

    record.Apply(logger.DataView);
  }

  logger.ResumeLogging();
}

SetupSort 方法将设置 DataView.Sort 属性,以便可以使用 DataView.Find 方法根据主键查找行。为此,DataTableColumn 集合中的 DataColumn 实例必须以相同的顺序初始化:

protected void SetupSort()
{
  if (logger.SourceTable.PrimaryKey == null)
  { 
    throw new DataTableTransactionException(
        "GetTransactions requires at least one PK.");
  }

  string sortBy = String.Empty;
  string comma = String.Empty;

  foreach (DataColumn dc in logger.SourceTable.PrimaryKey)
  {
    sortBy += comma + dc.ColumnName;
    comma = ", ";
  }

  logger.DataView.Sort = sortBy;
}

单元测试

我已经为核心日志记录功能和同步管理器创建了单元测试。

事务日志记录器

事务日志记录器单元测试是一系列顺序单元测试,可以使用我的高级单元测试引擎运行。

using System;
using System.Collections.Generic;
using System.Data;

using Vts.UnitTest;

using Clifton.Data;

namespace TransactionLoggerUnitTests
{
  [TestFixture]
  [ProcessTest]
  public class LoggerTests
  {
    protected DataTable dt;
    protected DataTableTransactionLog dttl;
    protected DataRow row;

    [TestFixtureSetUp]
    public void FixtureSetup()
    {
      dt = new DataTable();
      dt.Columns.Add(new DataColumn("LastName", typeof(string)));
      dt.Columns.Add(new DataColumn("FirstName", typeof(string)));

      dttl = new DataTableTransactionLog();
      dttl.SourceTable = dt;
    }

    [Test, Sequence(1)]
    public void NewRow()
    {
      row = dt.NewRow();
      Assertion.Assert(dttl.Log.Count == 1, "Expected one entry.");
      Assertion.Assert(dttl.Log[0].TransactionType == 
          DataTableTransactionRecord.RecordType.NewRow, 
          "Expected new row transaction.");
    }

    [Test, Sequence(2)]
    public void SetFields()
    {
      row["LastName"] = "Clifton";
      row["FirstName"] = "Marc";
      dt.Rows.Add(row);
      Assertion.Assert(dttl.Log.Count == 3, "Expected three entries.");
      Assertion.Assert(dttl.Log[1].TransactionType == 
           DataTableTransactionRecord.RecordType.ChangeField, 
           "Expected change field transaction.");
      Assertion.Assert(dttl.Log[2].TransactionType == 
           DataTableTransactionRecord.RecordType.ChangeField, 
           "Expected change field transaction.");
      Assertion.Assert(dttl.Log[1].NewValue.ToString()=="Clifton", 
           "Incorrect new value.");
      Assertion.Assert(dttl.Log[2].NewValue.ToString() == "Marc", 
           "Incorrect new value.");
    }

    [Test, Sequence(3)]
    public void CollectNothing()
    {
      dttl.CollectUncommittedRows();    
      Assertion.Assert(dt.Rows.Count == 1, "Committed row was collected!");
    }

    [Test, Sequence(4)]
    public void CollectUncommitted()
    {
      dt.NewRow();
      dttl.CollectUncommittedRows();
      Assertion.Assert(dttl.Log.Count == 3, "Expected three entries.");
    }

    [Test, Sequence(5)]
    public void RevertFirstNameChange()
    {
      dttl.Revert(2);
      Assertion.Assert(dt.Rows[0]["LastName"].ToString() ==
         "Clifton", "Incorrect value.");
      Assertion.Assert(dt.Rows[0]["FirstName"]==DBNull.Value, 
         "Incorrect new value.");
    }

    [Test, Sequence(6)]
    public void RevertLastNameChange()
    {
      dttl.Revert(1);
      Assertion.Assert(dt.Rows[0]["LastName"] == DBNull.Value, 
          "Incorrect value.");
      Assertion.Assert(dt.Rows[0]["FirstName"] == DBNull.Value, 
          "Incorrect new value.");
    }

    [Test, Sequence(7)]
    public void RevertNewRowChange()
    {
      dttl.Revert(0);
      Assertion.Assert(dt.Rows.Count == 0, "Row should have been deleted.");
    }

    [Test, Sequence(8)]
    public void ApplyNewRow()
    {
      dttl.Apply(0);
      Assertion.Assert(dt.Rows.Count == 1, "Row was not added.");
      Assertion.Assert(dt.Rows[0]["LastName"] == DBNull.Value, 
           "Incorrect value.");
      Assertion.Assert(dt.Rows[0]["FirstName"] == DBNull.Value, 
           "Incorrect new value.");
    }

    [Test, Sequence(9)]
    public void ApplyLastName()
    {
      dttl.Apply(1);
      Assertion.Assert(dt.Rows.Count == 1, "Row was not added.");
      Assertion.Assert(dt.Rows[0]["LastName"].ToString() == "Clifton", 
           "Incorrect value.");
      Assertion.Assert(dt.Rows[0]["FirstName"] == DBNull.Value, 
           "Incorrect new value.");
    }

    [Test, Sequence(10)]
    public void ApplyFirstName()
    {
      dttl.Apply(2);
      Assertion.Assert(dt.Rows.Count == 1, "Row was not added.");
      Assertion.Assert(dt.Rows[0]["LastName"].ToString() == "Clifton", 
           "Incorrect value.");
      Assertion.Assert(dt.Rows[0]["FirstName"].ToString() == "Marc", 
           "Incorrect new value.");
    }
  }
}

同步管理器

同步管理器的单元测试说明了同步两个 DataTable 实例的基本过程:

using System;
using System.Collections.Generic;
using System.Data;

using Vts.UnitTest;

using Clifton.Data;

namespace TransactionSyncrhonizationManagerUnitTests
{
  [TestFixture]
  [ProcessTest]
  public class CurrencyTests
  {
    protected DataTable dt1;
    protected DataTableTransactionLog dttl1;
    protected DataTable dt2;
    protected DataTableTransactionLog dttl2;

    [TestFixtureSetUp]
    public void FixtureSetup()
    {
      dt1 = new DataTable();
      dt1.Columns.Add(new DataColumn("PK", typeof(Guid)));
      dt1.Columns.Add(new DataColumn("LastName", typeof(string)));
      dt1.Columns.Add(new DataColumn("FirstName", typeof(string)));
      dt1.PrimaryKey = new DataColumn[] { dt1.Columns["PK"] };

      dttl1 = new DataTableTransactionLog(dt1);

      dt2 = dt1.Clone();
      dttl2 = new DataTableTransactionLog(dt2);

      DataRow row=dt1.NewRow();
      row["PK"]=Guid.NewGuid();
      row["LastName"]="Clifton";
      row["FirstName"]="Marc";
      dt1.Rows.Add(row);

      row=dt1.NewRow();
      row["PK"]=Guid.NewGuid();
      row["LastName"]="Linder";
      row["FirstName"]="Karen";
      dt1.Rows.Add(row);

      row=dt1.NewRow();
      row["PK"]=Guid.NewGuid();
      row["LastName"]="Doe";
      row["FirstName"]="John";
      dt1.Rows.Add(row);

      dt1.Rows[2].Delete();
    }

    [Test, Sequence(1)]
    public void UpdateMirror()
    {
      DataTableSynchronizationManager dtcm1 = 
          new DataTableSynchronizationManager(dttl1);
      List<TransactionRecordPacket> trpList = dtcm1.GetTransactions();

      DataTableSynchronizationManager dtcm2 = 
          new DataTableSynchronizationManager(dttl2);
      dtcm2.AddTransactions(trpList);
      dtcm2.Sync();

      Assertion.Assert(dt2.Rows.Count == 2, "Expected 2 rows.");
      Assertion.Assert(dt2.Rows[0]["LastName"].ToString() == "Clifton", 
          "Unexpected value");
      Assertion.Assert(dt2.Rows[0]["FirstName"].ToString() == "Marc", 
          "Unexpected value");      
      Assertion.Assert(dt2.Rows[1]["LastName"].ToString() == "Linder", 
          "Unexpected value");
      Assertion.Assert(dt2.Rows[1]["FirstName"].ToString() == "Karen", 
          "Unexpected value");
    }
  }
}

您会注意到两个 DataTable 实例都设置了相同的列结构。从与第一个日志记录器接口的 DataTableSynchronizationManager 获取的事务包集合被传递给与第二个日志记录器接口的第二个 DataTableSynchronizationManager 实例。添加包并调用 Sync 方法。另请注意,由于两个表旨在同步,因此它们现在都实现了一个主键列。

结论

DataTableSynchronizationManager 是同步两个远程 DataTable 实例之间数据的有用组件。事务包列表适用于序列化,使用我的RawSerialization库,它将提供一种紧凑的格式,以便通过网络发送。

© . All rights reserved.