65.9K
CodeProject 正在变化。 阅读更多。
Home

观察底层数组的变化

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.84/5 (14投票s)

2011年2月28日

CPOL

5分钟阅读

viewsIcon

56028

downloadIcon

270

ObservableCollection 的数组包装器,它通知观察者其底层数组的变化。

引言

最近,我遇到了一个问题,就是如何使用 WPF 的 ItemsControl 来可视化一个数组。问题在于,这个数组会不断地被我的代码之外的因素修改,因此我无法引发适当的集合通知,因为不存在一个固定的控制流来引发这些通知。此外,这个数组包含几十万个元素,因此简单地以固定的时间间隔重置 ItemsControlItemsSource 是不可行的。

因此,我提出了一个解决方案,这要归功于 CodeProject 用户 John Simmons 的有益反馈。这个解决方案的思路是继承 ObservableCollection<T> 类,以便为消费者提供 INotifyCollectionChanged 接口,同时在后台运行一个工作线程,监视底层数组并将更改的项移回 ObservableCollection

背景

需要解决的主要问题是,仅仅将底层数组包装在 ObservableCollection 中是不够的,因为 ObservableCollection 的构造函数会复制数组元素,并在有人通过 ObservableCollectionICollection<T>(或 ICollection,或 IList)接口修改副本时发出 CollectionChanged 事件。但是,如果出于某种原因,生成数组被修改了怎么办?如果我们目的是监视生成数组本身,而不是某个副本怎么办?

因此,我们的解决方案继承了 ObservableCollection 类,并保持对底层数组的引用,而不是副本。在固定的时间间隔(检查点)内,它会检查底层数组的内容并更新其实例,使其再次与数组同步。

此外,我们的解决方案解决了数组可能包含很多项的可能性。为了克服这一点,它利用了 .NET v4 中提供的出色的并行处理类。假设数组可能包含数百万个元素,但从一个检查点到下一个检查点只有少数元素会发生变化,我们可以用最小的开销轻松地监视这个数组。

最后,作为“奖品”,我们新的可观察集合类能够将其元素投影到另一种类型,前提是其构造函数中提供了投影函数。例如,我们可能有一个包含一千个整数的数组,将其包装在我们的集合类中,并为其提供一个投影函数,如 i => i * i,这样我们的集合的观察者就会看到这些整数的平方。

Using the Code

我们提供了扩展方法,以便我们可以像这样将数组包装在一个监视器中:

// Create an array
int[] theArray = new int[1000000];
// Populate the array
for (var i = 0; i < theArray.Length; i++)
    theArray[i] = i;

// Create a monitor wrapper
var mon = theArray.AsMonitored();
// Create a monitor wrapper which presents
// to its observer the squares of the array's elements.
var monProj = theArray.AsMonitoredProjected(i => i * i);

扩展方法如下:

  • AsMonitored<T>() - 每 100 毫秒监视一次底层 T 数组
  • AsMonitored<T>(int period) - 每 period 毫秒监视一次底层 T 数组
  • AsMonitoredProjected<T, P>(Func<T, P> project) - 每 500 毫秒监视一次底层 T 数组,同时将其投影到类型 P;以及
  • AsMonitoredProjected<T, P>(Func<T, P> project, int period) - 每 period 毫秒监视一次底层 T 数组,同时将其投影到类型 P

实现细节

实现我们集合的主要类名为 MonitoredProjectedArray

public class MonitoredProjectedArray<T, P>
    : ObservableCollection<P>, IDisposable
{
    // Fields
    protected T[] _monitoredArray;
    protected Func<T, P> _project;

    // Construction
    public MonitoredProjectedArray(T[] a, int period, Func<T, P> project)
        : base(a.AsParallel().AsOrdered().Select(project))
    {
        _monitoredArray = a;
        _project = project;

        /* Code that sets up changed items */

        /* Code that sets up a timer which runs every "period"
           milliseconds and calls method QueueChangedItems() */
    }

    public MonitoredProjectedArray(T[] a, Func<T, P> project)
        : this(a, 500, project)
    {
    }

    /* More helper methods */
}

重要的是要注意,MonitoredProjectedArray 类会保持对被监视数组的引用。其次,请注意底层数组是一个类型为 T 的项的数组,然而它继承自 ObservableColletion<P>。这是因为数组中的每个项都使用 _project 函数被投影到类型 P。这发生在实例创建期间(注意 base 调用)。

类的核心是 QueueChangedItems() 方法,该方法由计时器定期调用。该方法首先比较底层数组的元素和被监视数组实例的元素。如果发现差异,它会创建 ChangedItem 实例并将其存储在队列中。然后,对于队列中的每个项,它会将其出队并更新相应索引处的被监视数组。

ChangedItem 定义如下:

struct ChangedItem
{
    public int Index;     // The index in the array of the changed item
    public T NewValue;    // The new value at the specified index
}

为了利用 Framework 的新并行处理能力,我们在 QueueChangedItems() 方法中使用了 Parallel 类。

ConcurrentQueue<ChangedItem> _changedItems;

protected void QueueChangedItems()
{
    // Collect the changed items
    Parallel.For(0, _monitoredArray.Count(), i =>
    {
        if (!_project(_monitoredArray[i]).Equals(this[i]))
        {
            var ci = new ChangedItem() { Index = i, NewValue = _monitoredArray[i] };
            if (!_changedItems.Contains(ci))
                _changedItems.Enqueue(ci);
        }
    });

    // The following action updates this instance with the changed items.
    // Use _project to project the new values to type P.
    Action updateAction = () =>
    {
        ChangedItem item;
        while (_changedItems.TryDequeue(out item))
            this[item.Index] = _project(item.NewValue);
    };

    // Start four concurrent updateActions to consume the _changedItems queue
    Parallel.Invoke(updateAction, updateAction, updateAction, updateAction);
}

请注意,我们选择使用 ConcurrentQueue 对象来表示更改项队列,而不是使用简单的 QueueConcurrentQueue 位于 System.Collections.Concurrent 命名空间下,并且是线程安全的,这在我们的情况下是必需的,因为我们从四个不同的线程消费队列。

最后一点值得注意的是,我们在类中重写了 OnNotifyCollectionChanged() 方法,以便利用 Dispatcher。这样做是因为我们从不同的线程修改集合(实际上是从四个不同的线程,也就是通过 Parallel.Invoke() 方法启动的线程)。Dispatcher 是从不同线程更改 ObservableCollection 的唯一线程安全方式。

// Override OnCollectionChanged so that we make use of the Dispatcher
protected override void OnCollectionChanged
	(System.Collections.Specialized.NotifyCollectionChangedEventArgs e)
{
    using (BlockReentrancy())
    {
        // Get the CollectionChanged event handler
        System.Collections.Specialized.NotifyCollectionChangedEventHandler 
				eventHandler = CollectionChanged;
        if (eventHandler != null)
        {
            foreach (var handler in eventHandler.GetInvocationList())
            {
                DispatcherObject dispatcherObject = handler.Target as DispatcherObject;
                if (dispatcherObject != null && dispatcherObject.CheckAccess() == false)
                    dispatcherObject.Dispatcher.Invoke
			(DispatcherPriority.DataBind, handler, this, e);
                else
                    (handler as System.Collections.Specialized.
		NotifyCollectionChangedEventHandler)(this, e);
            }
        }
    }
}

我们获取附加到 CollectionChanged 事件的所有委托,如果其中任何一个在不同的线程上,我们就使用 Dispatcher 来调用它。否则,我们按正常方式进行。

最后的定论

MonitoredProjectedArray 类及其扩展方法绝对不是生产质量的代码。它的一些缺点是:

  • 它只监视数组,而不监视通用集合(即 ICollection<T>IList 等的实现者);
  • 用户无法配置监视策略:它硬编码在 QueueChangedItems() 方法中;
  • 它假定在两个连续的检查点之间,数组元素很少发生变化,这对于大多数实际应用程序来说是一个合理的假设,但在一般情况下,我们不应该想当然。

尽管存在这些缺点,但该类仍然非常实用,所以我希望它能在您的程序中作为类似情况的解决方案。玩得开心!

历史

  • 2011 年 2 月 28 日:发布第一个版本
  • 2011 年 3 月 1 日:演示文稿中有一些小改动
© . All rights reserved.