并发的 Observable Collection、Dictionary 和 Sorted Dictionary






4.76/5 (21投票s)
提供了并发可观察集合类的实现,
引言
本文档描述了我如何实现一个可以绑定到 WPF 列表控件的 Dictionary
类型类,但可以从非 WPF 用户界面的线程进行更新。这在您有工作线程更新 Dictionary
并希望 Dictionary
的内容绑定到控件时非常有用。微软在 .NET 4.0 中提供了一些 ObservableCollection
类,但不幸的是,ObservableDictionary
并不在其中,而且他们提供的 ObservableCollection
类也不支持多线程。虽然本文档中提供的类不像 .NET 4.0 新的 System.Collections.Concurrent
命名空间中的集合那样高效,但它们允许工作线程更新绑定到视图的有序集合。我认为我的实现中有一些可以做得更好的地方,因此我欢迎您在本文档底部发表您的评论,无论是正面的还是负面的。
背景
我想要一个线程安全的 Observable Sorted Dictionary,但在到达那里之前,我必须经过以下步骤
- 线程安全的 Observable Collection
- Observable Dictionary
- 线程安全的 Observable Dictionary
- Observable Sorted Dictionary
- 线程安全的 Observable Sorted Dictionary
线程安全的 Observable Collection
我从 .NET framework 的 ObservableCollection
开始,并着手使其线程安全,同时牢记以下几点
- 防止在读取值时修改集合
- 防止在外部事件处理期间修改集合
- 集合是有序的,不是队列,所以需要插入项目
- 需要能够交出一个可枚举对象,由于集合的有序性,该对象不会受到集合后续更改的影响
我最终创建了一个基类,它使用 .NET 的 ReaderWriterLockSlim
来对 ObservableCollection
对象进行读写锁定和写操作序列化,并通过生产者-消费者队列将序列化事件传递给可以绑定到用户界面的 ViewModel。对于可枚举对象,我只是交出一个不可变的集合,它是基础集合的快照。
这是集合写入时的过程
- 获取读锁
- 升级为写锁
- 设置标志,指示需要新的可枚举快照
- 更新封装的
ObservableCollection
- 处理来自封装集合的事件
- 添加到生产者-消费者队列
- 如果当前线程是用户界面线程,则处理整个生产者-消费者队列,以确保 ViewModel 反映已添加的内容。
- 如果锁仍为写锁,则降级为读锁
- 释放读锁
这是集合读取时的过程
- 获取读锁
- 返回值
- 释放读锁
这是请求枚举器时的过程
- 获取读锁
- 检查集合快照标志,查看是否需要新的快照
- 如果需要新的集合快照
- 获取集合快照的写锁
- 创建新的集合快照
- 清除集合快照更新所需标志
- 释放集合快照的写锁
- 从快照返回枚举器
写入过程代码
/// <summary>
/// Calls the read function passed in, and if it returns true,
/// then calls the next read function, else calls the write
/// function.
/// </summary>
protected TResult DoBaseReadWrite<TResult>(Func<bool> readFuncTest,
Func<TResult> readFunc, Func<TResult> writeFunc) {
_readWriteLock.TryEnterUpgradeableReadLock(Timeout.Infinite);
try {
if(readFuncTest()) {
return readFunc();
} else {
_readWriteLock.TryEnterWriteLock(Timeout.Infinite);
try {
_newSnapshotRequired = true;
TResult returnValue = writeFunc();
return returnValue;
} finally {
if(_readWriteLock.IsWriteLockHeld) {
_readWriteLock.ExitWriteLock();
}
}
}
} finally {
_readWriteLock.ExitUpgradeableReadLock();
}
}
}
/// <summary>
/// Handles write access to the base collection when a return value is required
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="writeFunc"></param>
/// <returns></returns>
protected TResult DoBaseWrite<TResult>(Func<TResult> writeFunc) {
_readWriteLock.TryEnterUpgradeableReadLock(Timeout.Infinite);
try {
_readWriteLock.TryEnterWriteLock(Timeout.Infinite);
_newSnapshotRequired = true;
return writeFunc();
} finally {
if(_readWriteLock.IsWriteLockHeld) {
_readWriteLock.ExitWriteLock();
}
_readWriteLock.ExitUpgradeableReadLock();
}
}
读取过程代码
/// <summary>
/// Handles read access from the base collection
/// </summary>
protected TResult DoBaseRead<TResult>(Func<TResult> readFunc) {
if(IsDispatcherThread) {
return readFunc();
}
_readWriteLock.TryEnterReadLock(Timeout.Infinite);
try {
return readFunc();
} finally {
_readWriteLock.ExitReadLock();
}
}
获取枚举器代码
/// <summary>
/// Gets the enumerator for a snapshot of the collection
/// </summary>
public IEnumerator<T> GetEnumerator() {
if(IsDispatcherThread) {
return _viewModel.GetEnumerator();
} else {
return Snapshot.GetEnumerator();
}
}
/// <summary>
/// Gets an immutable snapshot of the collection
/// </summary>
public ImmutableCollectionBase<T> Snapshot {
get {
return DoBaseRead(() => {
UpdateSnapshot();
return _baseSnapshot;
});
}
}
/// <summary>
/// Updates the snapshot that is used to generate an Enumerator
/// </summary>
private void UpdateSnapshot() {
if(_newSnapshotRequired) {
_snapshotLock.TryEnterWriteLock(Timeout.Infinite);
if(_newSnapshotRequired) {
_baseSnapshot = new ImmutableCollection(_baseCollection);
_newSnapshotRequired = false;
}
_snapshotLock.ExitWriteLock();
}
}
Observable Dictionary
现在我有一种方法可以包装对 observable collection 的访问,使其适合从非 GUI 线程进行更新。所以我只需要包装 .NET 的 ObservableDictionary
类,但目前还没有,所以我必须自己写一个。
我写的 ObservableDictionary
类封装了 2 个集合
- 一个键值对的
ObservableCollection
,作为ObservableDictionary
的 observable 部分的基础 - 一个
Dictionary
,将 Key 映射到基础ObservableCollection
中的索引
然而,这不允许我整洁地将项目插入 ObservableCollection
而不更新大量的 Dictionary
条目,所以我做的是将链表节点存储在 Dictionary
中而不是索引,其中链接的顺序与 ObservableCollection
的顺序相同。链表节点具有递归的递减前向和递增前向方法,用于在删除或插入节点时移动它们指向的索引。下面的代码显示了它们的工作原理
/// <summary>
/// This function effectively removes this node from the linked list,
/// and decrements the position index of all the nodes that follow it.
/// It removes the node by changing the nodes that come before and
/// after it to point to each other, thus bypassing this node.
/// </summary>
public void Remove() {
if (this.Previous != null) {
this.Previous.Next = this.Next;
}
if (this.Next != null) {
this.Next.Previous = this.Previous;
}
DecrementForward();
}
/// <summary>
/// This recursive function decrements the position index of all the nodes
/// in front of this node. Used for when a node is removed from a list.
/// </summary>
private void DecrementForward() {
if (this.Next != null) {
this.Next.Index--;
this.Next.DecrementForward();
}
}
/// <summary>
/// This recursive function decrements the position index of all the nodes
/// in front of this node. Used for when a node is inserted into a list.
/// </summary>
private void IncrementForward() {
if (this.Next != null) {
this.Next.Index++;
this.Next.IncrementForward();
}
}
一旦有了 ObservableDictionary
类,线程安全的版本就只是简单地用大量的访问器包装 ObservableDictionary
,除了 Keys
和 Values
属性。Keys
和 Values
是从 Dictionary
的快照中读取的,如果 Keys
或 Values
属性被读取,则在必要时进行更新。
Observable Sorted Dictionary
此时,ObservableDictionary
的排序版本仅仅是指定在添加新项目时插入的位置。为此,我使用了一个二分排序算法
/// <summary>
/// Gets the position for a key to be inserted such that the sort order is maintained.
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
public int GetInsertIndex(int count, TKey key, Func<int, TKey> indexToKey) {
return BinarySearchForIndex(0, count - 1, key, indexToKey);
}
/// <summary>
/// Searches for the index of the insertion point for the key passed in such that
/// the sort order is maintained. Implemented as a non-recursive method.
/// </summary>
/// <param name="low"></param>
/// <param name="high"></param>
/// <param name="key"></param>
/// <returns></returns>
private int BinarySearchForIndex
(int low, int high, TKey key, Func<int, TKey> indexToKey) {
while (high >= low) {
// Calculate the mid point and determine if the key passed in
// should be inserted at this point, below it, or above it.
int mid = low + ((high - low) >> 1);
int result = this.Compare(indexToKey(mid), key);
// Return the current position, or change the search bounds
// to be above or below the mid point depending on the result.
if (result == 0)
return mid;
else if (result < 0)
low = mid + 1;
else
high = mid - 1;
}
return low;
}
与 ObservableDictionary
一样,这只是简单地用我编写的线程安全 ConcurrentObservableBase
类的访问器来包装 ObservableSortedDictionary
。
Using the Code
在附加源代码的 Swordfish.NET.General
项目中,我在 Swordfish.NET.Collections
命名空间下提供了以下集合类
ObservableDictionary
ObservableSortedDictionary
ConcurrentObservableCollection
ConcurrentObservableDictionary
ConcurrentObservableSortedDictionary
附加的源代码还包含一个 WPF 应用程序,您可以使用它对上述集合类进行一些轻量级的交互式测试。
关注点
在 .NET 4.0 中,微软实现了一些无序的并发集合,包括 ConcurrentDictionary。
John Simmons 在 2010 年 5 月于 Code Project 上 提出了对 ObservableDictionary 的需求,并链接了一个实现。
历史
- 首次发布于 2011 年 6 月 8 日
- 于 2011 年 9 月 12 日修复了 Olly Hayes 发现的一些问题
- 重要更新,将 ReadWriteLock 更改为 ReadWriteLockSlim,并更改为使用单独的 ViewModel 集合来绑定到用户界面。将源代码标记为 v2 以反映重大更改。