65.9K
CodeProject 正在变化。 阅读更多。
Home

并发的 Observable Collection、Dictionary 和 Sorted Dictionary

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.76/5 (21投票s)

2011年6月8日

CPOL

5分钟阅读

viewsIcon

134973

downloadIcon

3242

提供了并发可观察集合类的实现, 用于绑定到 WPF 控件, 从而可以从非 WPF GUI 线程更新集合

Test Application

引言

本文档描述了我如何实现一个可以绑定到 WPF 列表控件的 Dictionary 类型类,但可以从非 WPF 用户界面的线程进行更新。这在您有工作线程更新 Dictionary 并希望 Dictionary 的内容绑定到控件时非常有用。微软在 .NET 4.0 中提供了一些 ObservableCollection 类,但不幸的是,ObservableDictionary 并不在其中,而且他们提供的 ObservableCollection 类也不支持多线程。虽然本文档中提供的类不像 .NET 4.0 新的 System.Collections.Concurrent 命名空间中的集合那样高效,但它们允许工作线程更新绑定到视图的有序集合。我认为我的实现中有一些可以做得更好的地方,因此我欢迎您在本文档底部发表您的评论,无论是正面的还是负面的。

背景

我想要一个线程安全的 Observable Sorted Dictionary,但在到达那里之前,我必须经过以下步骤

  • 线程安全的 Observable Collection
  • Observable Dictionary
  • 线程安全的 Observable Dictionary
  • Observable Sorted Dictionary
  • 线程安全的 Observable Sorted Dictionary

线程安全的 Observable Collection

我从 .NET framework 的 ObservableCollection 开始,并着手使其线程安全,同时牢记以下几点

  • 防止在读取值时修改集合
  • 防止在外部事件处理期间修改集合
  • 集合是有序的,不是队列,所以需要插入项目
  • 需要能够交出一个可枚举对象,由于集合的有序性,该对象不会受到集合后续更改的影响

我最终创建了一个基类,它使用 .NET 的 ReaderWriterLockSlim 来对 ObservableCollection 对象进行读写锁定和写操作序列化,并通过生产者-消费者队列将序列化事件传递给可以绑定到用户界面的 ViewModel。对于可枚举对象,我只是交出一个不可变的集合,它是基础集合的快照。

这是集合写入时的过程

  1. 获取读锁
  2. 升级为写锁
  3. 设置标志,指示需要新的可枚举快照
  4. 更新封装的 ObservableCollection
  5. 处理来自封装集合的事件
    1. 添加到生产者-消费者队列
    2. 如果当前线程是用户界面线程,则处理整个生产者-消费者队列,以确保 ViewModel 反映已添加的内容。
  6. 如果锁仍为写锁,则降级为读锁
  7. 释放读锁

这是集合读取时的过程

  1. 获取读锁
  2. 返回值
  3. 释放读锁

这是请求枚举器时的过程

  1. 获取读锁
  2. 检查集合快照标志,查看是否需要新的快照
  3. 如果需要新的集合快照
    1. 获取集合快照的写锁
    2. 创建新的集合快照
    3. 清除集合快照更新所需标志
    4. 释放集合快照的写锁
  4. 从快照返回枚举器

写入过程代码

/// <summary>
/// Calls the read function passed in, and if it returns true,
/// then calls the next read function, else calls the write
/// function.
/// </summary>
protected TResult DoBaseReadWrite<TResult>(Func<bool> readFuncTest, 
	Func<TResult> readFunc, Func<TResult> writeFunc) {
    _readWriteLock.TryEnterUpgradeableReadLock(Timeout.Infinite);
    try {
      if(readFuncTest()) {
        return readFunc();
      } else {
        _readWriteLock.TryEnterWriteLock(Timeout.Infinite);
        try {
          _newSnapshotRequired = true;
          TResult returnValue = writeFunc();
          return returnValue;
        } finally {
          if(_readWriteLock.IsWriteLockHeld) {
            _readWriteLock.ExitWriteLock();
          }
        }
      }
    } finally {
      _readWriteLock.ExitUpgradeableReadLock();
    }
  }
}
/// <summary>
/// Handles write access to the base collection when a return value is required
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="writeFunc"></param>
/// <returns></returns>
protected TResult DoBaseWrite<TResult>(Func<TResult> writeFunc) {
  _readWriteLock.TryEnterUpgradeableReadLock(Timeout.Infinite);
  try {
    _readWriteLock.TryEnterWriteLock(Timeout.Infinite);
    _newSnapshotRequired = true;
    return writeFunc();
  } finally {
    if(_readWriteLock.IsWriteLockHeld) {
      _readWriteLock.ExitWriteLock();
    }
    _readWriteLock.ExitUpgradeableReadLock();
  }
}

读取过程代码

/// <summary>
/// Handles read access from the base collection
/// </summary>
protected TResult DoBaseRead<TResult>(Func<TResult> readFunc) {
  if(IsDispatcherThread) {
    return readFunc();
  }

  _readWriteLock.TryEnterReadLock(Timeout.Infinite);
  try {
    return readFunc();
  } finally {
    _readWriteLock.ExitReadLock();
  }
}

获取枚举器代码

/// <summary>
/// Gets the enumerator for a snapshot of the collection
/// </summary>
public IEnumerator<T> GetEnumerator() {
  if(IsDispatcherThread) {
    return _viewModel.GetEnumerator();
  } else {
    return Snapshot.GetEnumerator();
  }
}

/// <summary>
/// Gets an immutable snapshot of the collection
/// </summary>
public ImmutableCollectionBase<T> Snapshot {
  get {
    return DoBaseRead(() => {
      UpdateSnapshot();
      return _baseSnapshot;
    });  
  }
}

/// <summary>
/// Updates the snapshot that is used to generate an Enumerator
/// </summary>
private void UpdateSnapshot() {
  if(_newSnapshotRequired) {
    _snapshotLock.TryEnterWriteLock(Timeout.Infinite);
    if(_newSnapshotRequired) {
      _baseSnapshot = new ImmutableCollection(_baseCollection);
      _newSnapshotRequired = false;
    }
    _snapshotLock.ExitWriteLock();
  }
}

Observable Dictionary

现在我有一种方法可以包装对 observable collection 的访问,使其适合从非 GUI 线程进行更新。所以我只需要包装 .NET 的 ObservableDictionary 类,但目前还没有,所以我必须自己写一个。

我写的 ObservableDictionary 类封装了 2 个集合

  • 一个键值对的 ObservableCollection,作为 ObservableDictionary 的 observable 部分的基础
  • 一个 Dictionary,将 Key 映射到基础 ObservableCollection 中的索引

然而,这不允许我整洁地将项目插入 ObservableCollection 而不更新大量的 Dictionary 条目,所以我做的是将链表节点存储在 Dictionary 中而不是索引,其中链接的顺序与 ObservableCollection 的顺序相同。链表节点具有递归的递减前向和递增前向方法,用于在删除或插入节点时移动它们指向的索引。下面的代码显示了它们的工作原理

/// <summary>
/// This function effectively removes this node from the linked list,
/// and decrements the position index of all the nodes that follow it.
/// It removes the node by changing the nodes that come before and
/// after it to point to each other, thus bypassing this node.
/// </summary>
public void Remove() {
  if (this.Previous != null) {
    this.Previous.Next = this.Next;
  }
  if (this.Next != null) {
    this.Next.Previous = this.Previous;
  }
  DecrementForward();
}

/// <summary>
/// This recursive function decrements the position index of all the nodes
/// in front of this node. Used for when a node is removed from a list.
/// </summary>
private void DecrementForward() {
  if (this.Next != null) {
    this.Next.Index--;
    this.Next.DecrementForward();
  }
}

/// <summary>
/// This recursive function decrements the position index of all the nodes
/// in front of this node. Used for when a node is inserted into a list.
/// </summary>
private void IncrementForward() {
  if (this.Next != null) {
    this.Next.Index++;
    this.Next.IncrementForward();
  }
}

一旦有了 ObservableDictionary 类,线程安全的版本就只是简单地用大量的访问器包装 ObservableDictionary,除了 KeysValues 属性。KeysValues 是从 Dictionary 的快照中读取的,如果 KeysValues 属性被读取,则在必要时进行更新。

Observable Sorted Dictionary

此时,ObservableDictionary 的排序版本仅仅是指定在添加新项目时插入的位置。为此,我使用了一个二分排序算法

/// <summary>
/// Gets the position for a key to be inserted such that the sort order is maintained.
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
public int GetInsertIndex(int count, TKey key, Func<int, TKey> indexToKey) {
  return BinarySearchForIndex(0, count - 1, key, indexToKey);
}

/// <summary>
/// Searches for the index of the insertion point for the key passed in such that
/// the sort order is maintained. Implemented as a non-recursive method.
/// </summary>
/// <param name="low"></param>
/// <param name="high"></param>
/// <param name="key"></param>
/// <returns></returns>
private int BinarySearchForIndex
(int low, int high, TKey key, Func<int, TKey> indexToKey) {
  while (high >= low) {

    // Calculate the mid point and determine if the key passed in
    // should be inserted at this point, below it, or above it.
    int mid = low + ((high - low) >> 1);
    int result = this.Compare(indexToKey(mid), key);

    // Return the current position, or change the search bounds
    // to be above or below the mid point depending on the result.
    if (result == 0)
      return mid;
    else if (result < 0)
      low = mid + 1;
    else
      high = mid - 1;
  }
  return low;
}

ObservableDictionary 一样,这只是简单地用我编写的线程安全 ConcurrentObservableBase 类的访问器来包装 ObservableSortedDictionary

Using the Code

在附加源代码的 Swordfish.NET.General 项目中,我在 Swordfish.NET.Collections 命名空间下提供了以下集合类

  • ObservableDictionary
  • ObservableSortedDictionary
  • ConcurrentObservableCollection
  • ConcurrentObservableDictionary
  • ConcurrentObservableSortedDictionary

附加的源代码还包含一个 WPF 应用程序,您可以使用它对上述集合类进行一些轻量级的交互式测试。

关注点

在 .NET 4.0 中,微软实现了一些无序的并发集合,包括 ConcurrentDictionary

John Simmons 在 2010 年 5 月于 Code Project 上 提出了对 ObservableDictionary 的需求,并链接了一个实现。

历史

  • 首次发布于 2011 年 6 月 8 日
  • 于 2011 年 9 月 12 日修复了 Olly Hayes 发现的一些问题
  • 重要更新,将 ReadWriteLock 更改为 ReadWriteLockSlim,并更改为使用单独的 ViewModel 集合来绑定到用户界面。将源代码标记为 v2 以反映重大更改。
并发 Observable Collection、Dictionary 和 Sorted Dictionary - CodeProject - 代码之家
© . All rights reserved.