65.9K
CodeProject 正在变化。 阅读更多。
Home

CNTK 模型并发演示

starIconstarIconstarIconstarIconstarIcon

5.00/5 (4投票s)

2018年10月1日

CPOL

10分钟阅读

viewsIcon

10399

downloadIcon

130

如何部署训练好的模型以实现并发

访问第 1 部分:数据可视化和贝塞尔曲线

访问第 2 部分:贝塞尔曲线机器学习演示

访问第 3 部分:使用 MS CNTK 进行贝塞尔曲线机器学习演示

引言

本文是系列文章的第 4 部分。之前的文章探讨了使用贝塞尔曲线对纵向数据进行建模以及利用机器学习算法训练网络模型以识别此类曲线轨迹中反映的模式和趋势的技术。然而,一旦您训练并验证了一个或多个合适的分类模型,您就希望在生产运行中高效地部署这些模型。

通常(至少在我有限的经验中),分类步骤(将模型应用于新的特征数据记录并返回结果)是一个非常快、相当“短时运行”的过程。然而,通常在生产环境中(要检查数千个案例),该过程可能相当深入地嵌入到更耗时的“长时间运行”数据处理步骤中。这可能涉及各种操作,以收集、建模和将“原始数据”整形为准备分类的记录。然后,它可能还有其他步骤(仅将分类结果作为另一个变量包含在内),以进一步整形数据用于描述性或预测性分析和/或可视化等。

在这种情况下,自然会想到并行处理。不幸的是,通常将模型应用于数据的功能不是线程安全的。例如,ALGLIB 神经网络和决策森林以及 MS CNTK 神经网络提供的功能都不是线程安全的。本文演示了一种技术,用于安全地并发部署训练好的模型,并将其嵌入到 C# 中 Parallel ForEach 循环内运行的(否则为线程安全的)进程中。

背景

就上下文而言,在之前的文章中,我们考察了学生从 6 年级到 12 年级的学业表现,并使用贝塞尔曲线进行建模。我们演示了机器学习分类模型,这些模型可以训练以识别各种模式,例如识别表现良好或可能处于危险中的学生的纵向轨迹。

在这里,在这个独立的“ConcurrencyDemo”控制台应用程序项目中,我们将使用与第 3 部分中介绍的先前“BezierCurveMachineLearningDemoCNTK”项目相同的数据集。该项目演示了训练不同类型网络的各种方法,并允许将其保存以供以后在生产运行中使用。该项目包括 3 个由先前项目生成的训练好的模型(一个 CNTK 神经网络、一个 ALGLIB 神经网络和一个决策森林网络)。

当此 ConcurrencyDemo 项目在 Visual Studio 中下载并打开时,所需的 CNTK 2.6 Nuget 包应自动恢复并作为引用包含。然后,配置为 Debug 和 Release 的 x64 构建,使用 CNTK 模型的并发演示应运行良好。但是,要测试 ALGLIB 模型,您需要成功下载并构建免费版 ALGLIB314.dll。在第 2 部分中详细介绍了如何进行此操作。当将其添加为引用并取消注释演示代码的相应部分时,您也可以测试这些模型。

我认为这篇文章可以单独阅读和理解。回顾第 1 到第 3 部分的叙述将为更好地理解奠定基础。CNTK 训练模型不是线程安全的,这里有一个很好的参考资料:Windows 上的模型评估。并行操作和并发不是简单的主题,我承认我不是专家。这里有一个来自 Microsoft 的非常好的复习参考资料:并行编程模式:使用 .NET Framework 4 理解和应用并行模式

训练模型部署

本系列的第 3 部分提供了使用 ALGLIB 和 CNTK 试验训练、验证和保存各种分类模型的代码。假设您已经完成了这些,并且您有三个保存的训练模型文件:一个 ALGLIB 神经网络、一个 ALGLIB 决策森林和一个 CNTK 神经网络。还假设您有一个新的数据集,其中包含特征(在本例中,是从起点到终点代表贝塞尔曲线轨迹的 24 个等时间间隔值的向量)。每个案例都将作为 double[] 数据数组呈现给分类器。然后,分类器应返回分类概率列表以及最大(因此最可能预测的)分类的索引。

至少有几种方法可以成功地并发运行训练好的模型。然而,我发现的最好、最简单的方法(遵循 CodeProject 贡献者 Bahrudin Hrnjica 的出色建议,他当然非常了解 CNTK)涉及使用 ThreadLocal 类方法。这是本演示中使用的 ClassifierModel.cs 模块的部分代码

using CNTK;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;

namespace ConcurrencyDemo
{
    public enum EstimationMethod { ALGLIB_NN = 1, ALGLIB_DF = 2, CNTK_NN = 3 };

    public interface IClassifier
    {
        int PredictStatus(double[] data);
        (int pstatus, List<double> prob) PredictProb(double[] data);

        int GetThreadCount();
        void ClearThreadLocalStorage();
        void DisposeThreadLocalStorage();
    }

    /// <summary>
    /// Choose Machine Learning Classification Method using EstimationMethod ENUM value
    /// </summary>
    public class MLModel : IClassifier
    {
        // Fields - some encapsulated
        private IClassifier _IClassifierModel;

        // Constructor
        /// <summary>
        /// Loads the trained network indicated by ENUM value from a file
        /// </summary>
        /// <param name="mychoice">an EstimationMethod EMUN value</param>
        /// <param name="applicationPath">path to application folder</param>
        public MLModel(EstimationMethod mychoice, string applicationPath)
        {
            switch (mychoice)
            {
                case EstimationMethod.ALGLIB_NN:
                    _IClassifierModel = new ALGLIB_NN_Model(Path.Combine
                            (applicationPath, "TrainedModels\\TrainedALGLIB_NN.txt"));
                    return;
                case EstimationMethod.ALGLIB_DF:
                    _IClassifierModel = new ALGLIB_DF_Model(Path.Combine
                           (applicationPath, "TrainedModels\\TrainedALGLIB_DF.txt"));
                    return;
                case EstimationMethod.CNTK_NN:
                    _IClassifierModel = new CNTK_NN_Model(Path.Combine
                           (applicationPath, "TrainedModels\\TrainedCNTK_NN.txt"));
                    return;
            }
            throw new Exception("No status estimation method was provided");
        }

        // Methods - Pass-through required by IClassifier interface
        /// <summary>
        /// Classify a Bezier History Curve by Status using this IClassifier model
        /// </summary>
        /// <returns> an integer classification label</returns>
        public int PredictStatus(double[] array)
        {
            return _IClassifierModel.PredictStatus(array);
        }
        /// <summary> Classify a Bezier History Curve by Status and 
        /// Probability using this IClassifier model
        /// </summary>
        /// <param name="array">24 equi-spaced Bezier values from start to end</param>
        public (int pstatus, List<double> prob) PredictProb(double[] array)
        {
            return _IClassifierModel.PredictProb(array);
        }

        public int GetThreadCount()             { return _IClassifierModel.GetThreadCount();     }
        public void ClearThreadLocalStorage()   { _IClassifierModel.ClearThreadLocalStorage();   }
        public void DisposeThreadLocalStorage() { _IClassifierModel.DisposeThreadLocalStorage(); }
    }

    public class CNTK_NN_Model : IClassifier
    {
        // FIELDS
        private readonly CNTK.Function _clonedModelFunc;    // this is the original model
        private ThreadLocal<CNTK.Function> _cloneModel;     // this is ThreadLocal clone

        // CONSTRUCTORS
        public CNTK_NN_Model(string networkPathName)             // if network is in a file
        {
            if (File.Exists(networkPathName))
            {
                CNTK.Function CNTK_NN = Function.Load(networkPathName,
                    DeviceDescriptor.UseDefaultDevice(), ModelFormat.CNTKv2);

                _clonedModelFunc = CNTK_NN;  // Save original
                _cloneModel = new ThreadLocal<CNTK.Function>(() =>
                    _clonedModelFunc.Clone(ParameterCloningMethod.Share),true);
            }
            else throw new FileNotFoundException("CNTK_NN Classifier file not found.");
        }
        public CNTK_NN_Model(Function CNTK_NN)                   // if network is in memory
        {
            _clonedModelFunc = CNTK_NN;  // Save original
            _cloneModel = new ThreadLocal<CNTK.Function>(() =>
                _clonedModelFunc.Clone(ParameterCloningMethod.Share), true);
        }

        // METHODS
        /// <summary>
        /// Classify a Bezier Curve History by Status
        /// </summary>
        /// <param name="array">24 equi-spaced Bezier values from start to end</param>
        /// <returns> an integer classification label</returns>
        public int PredictStatus(double[] data)
        {
            (int pstatus, List<double> prob) = PredictProb(data);
            return pstatus;
        }
        /// <summary> Classify a Bezier Curve History by Status and Probability Using CNTK
        /// </summary>
        /// <param name="array">24 equi-spaced Bezier values from start to end</param>
        /// <returns>A tuple (int status, List<double> prob)</returns>
        public (int pstatus, List<double> prob) PredictProb(double[] data)
        {
            CNTK.Function NN = _cloneModel.Value;

            // extract features and label from the cloned model
            Variable feature     = NN.Arguments[0];
            Variable label       = NN.Outputs[0];
            int inputDim         = feature.Shape.TotalSize;
            int numOutputClasses = label.Shape.TotalSize;

            float[] xVal = new float[inputDim];
            for (int i = 0; i < 24; i++) xVal[i] = (float)data[i];
            Value xValues = Value.CreateBatch<float>(new int[] { feature.Shape[0] }, xVal,
                DeviceDescriptor.UseDefaultDevice());

            var inputDataMap  = new Dictionary<Variable, Value>() { { feature, xValues } };
            var outputDataMap = new Dictionary<Variable, Value>() { { label, null } };

            NN.Evaluate(inputDataMap, outputDataMap, DeviceDescriptor.UseDefaultDevice());
            var outputData = outputDataMap[label].GetDenseData<float>(label);

            List<double> prob = new List<double>();
            for (int i = 0; i < numOutputClasses; i++) prob.Add(outputData[0][i]);
            int pstatus = prob.IndexOf(prob.Max()) + 1;     // label classes should start at one

            return (pstatus, prob);
        }
        
        /// <summary>
        /// Returns the number of ThreadLocal values produced and consumed<para>
        /// by the model instance running serially or currently</para>
        /// </summary>
        ///
        public int GetThreadCount() { return (_cloneModel.Values).Count(); }
        /// <summary>
        /// Dispose the old ThreadLocal instance and create a new one inside the model object
        /// </summary>
        public void ClearThreadLocalStorage()
        {
            _cloneModel.Dispose();
            _cloneModel = new ThreadLocal<CNTK.Function>(() => 
                          _clonedModelFunc.Clone(ParameterCloningMethod.Share), true);
        }
        /// <summary>
        /// When finished using the model instance, dispose the ThreadLocal instance first.<para>
        /// Then the model object will be garbage-collected</para>
        /// </summary>
        public void DisposeThreadLocalStorage() { _cloneModel.Dispose(); }
    }

抱歉代码块很长,但解释起来并不难。假设我们知道训练模型磁盘文件的名称和完整路径。在此演示中,它们是

applicationPath + "Trained Models\\TrainedALGLIB_NN.txt"
applicationPath + "Trained Models\\TrainedALGLIB_DF.txt"
applicationPath + "Trained Models\\TrainedCNTK_NN.txt"

然后我们首先使用 Enum EstimationMethod 选择一个模型用于生产运行,就像这样。然后,稍后,可能在其他数据处理步骤的深处,我们将模型应用于特征数据向量,就像这样

MLMODEL myModel = new MLMODEL(EstimationMethod.CNTK, myApplicationpath);
…
int pred = myModel.PredictStatus(someFeatureDataVector);
…

首先,我们定义一个 ICLASSIFIER 接口。该接口在此处显示,以允许用户添加和切换多种类型的训练模型。(如果只对一种类型的模型感兴趣,则不需要它)。在上面的代码中,只显示了 CNTK 模型。接下来,我们有一个 MLMODEL 类和一个构造函数,它加载选定的训练模型数据文件并创建一个包含该模型的新 ICLASSIFIER 对象。由于两者都继承自 ICLASSIFIER,因此两者都必须实现几个方法:PredictStatus()PredictProb(),以及其他几个与 ThreadLocal 类相关的有用属性。我们调用 myModel 对象的方法,它们又调用它定义的 ICLASSIFIER 对象中的相同方法。在那里,模型应用于数据,并将返回值沿链传递回调用进程。

新的 ICLASSIFIER 对象构造函数接收模型磁盘文件的路径并将模型加载到此变量中

private readonly CNTK.Function _clonedModelFunc; // this is the original model

然后它创建自己的 ThreadLocal<T> 对象,其中 T 是训练模型类型,像这样

private ThreadLocal<CNTK.Function> _cloneModel; // this is ThreadLocal clone
...
_cloneModel = new ThreadLocal<CNTK.Function>(() => 
    clonedModelFunc.Clone(ParameterCloningMethod.Share), true);

然后,稍后在代码中的其他地方,当在某个线程上运行时,会调用 myModel.PredictStatus(datavector)myModel.PredictProb(datavector) 方法。首先发生的是,该方法调用 ThreadLocal 对象以获取要在该特定线程上使用并应用于数据记录的唯一克隆模型“Value”,从而产生分类。

CNTK.Function NN = _cloneModel.Value;  // Acquire the unique clone for this thread
...
... evaluate
...
return (pstatus, prob);

THREADLOCAL<T>

关于 MLMODEL 类及其创建的 ICLASSIFIER 对象最有趣的是使用了 ThreadLocal 对象以及它的一些相关属性和方法。这里再次是问题:CNTK 训练模型和相关方法不是线程安全的。对于 ALGLIB 网络模型也是如此。补救措施是创建克隆模型以用于并发处理。

如果您正在串行运行(在应用程序的主线程上),您只需要一个模型。如果您正在并行并发运行进程,您将需要多个克隆副本。最少,您需要为您的硬件提供的每个逻辑处理器提供一个副本。例如,如果您有 2 个提供超线程的 CPU,那么您有 4 个逻辑处理器。(例如,在 Parallel ForEach 循环中,您可以使用 PLINQ 的 WithDegreeOfParallelism 方法来控制这一点。)很多时候,System.Threading 方法使用数据分区方案并将单个线程分配给每个逻辑处理器。但仍然经常,负载平衡等可能导致几个线程在同一处理器上运行。

创建这些克隆是一个耗时且耗内存的过程,不能“即时”完成。ThreadLocal<T> 为我们处理所有这些,为每个新的处理器线程创建和存储一个新克隆,并为在该特定线程上发生的事件提供该特定克隆。这样做的位置是在 ICLASSIFIER 网络对象构造函数中。

当模型完全在主线程上运行时,只存储一个克隆值,并且只使用一个副本。但是如果您有多个逻辑处理器并且正在并发运行分类方法,您将在 ThreadLocal 存储中存储多个值。提供了几个有用的方法来检查这一点。

GetThreadCount() 获取已存储的副本数量。在程序中可能涉及分类的每个并行代码块之前清除和重新创建 ThreadLocal 对象是一个好主意。ClearThreadLocalStorage() 可以做到这一点。最后,在退出应用程序之前,您应始终处置该存储。DisposeThreadLocalStorage() 可以做到这一点。

因此,上述代码中的 CNTK_NN 对象始终知道要为每个数据分区和每个逻辑处理器 ThreadID 使用哪个克隆模型元素。这允许 MLMODEL 对象在并发进程中成功创建并运行线程安全的 ICLASSIFIER 对象。

使用并发演示代码

现在,我们来看看手头的 ConcurrencyDemo。下载并解压该项目(可能在桌面上)。此步骤将生成一个名为 ConcurrencyDemo 的文件夹,其中包含 ConcurrencyDemo.sln 文件以及其他文件。在 Visual Studio 中打开该解决方案文件。检查 Configuration Manager 以确保解决方案中所有项目的 DebugRelease 构建都设置为 x64。接下来,构建 DebugRelease 版本。如果到目前为止一切顺利,请选择 ConcurrencyDemo 作为 StartUp 项目,然后单击 Start 以构建并运行控制台应用程序。控制台窗口中应出现类似于下图的输出

该演示附带注释掉的 ALGLIB 神经网络和决策森林类。(这些分类器的 MLMODEL 构造函数中的行也已注释掉。)为了演示它们也可以成功并发运行,最后一个设置步骤是在 Solution Explorer 中展开 ConcurrencyDemo 项目文件夹,右键单击那里的 References 文件夹,然后使用 Add References 选项将 ALGLIB314.dll 库(您将在本系列的第 2 部分中构建)添加为 ConcurrencyDemo 项目的引用,方法是单击其复选框和 OK 按钮。然后取消注释 ClassifierModels.cs 模块中注释掉的部分,并更改 Program.cs 模块中的 Enum 值。

关注点

这里的理念是,CodeProject“演示”的大部分好处应该来自于对代码的检查。读者当然被鼓励这样做(任何改进建议都将不胜感激)。本文可能已经太长了,而 program.cs 模块中的代码基本上是不言自明的。

我们所做的是使用 DataReader 对象加载整个样本数据文件(N=500)。一旦我们将其加载到内存中,为了好玩,我们将其复制 10 次,并将其格式化为通过 ID 键访问的 double[] 值字典。接下来,我们创建一个本地进程,该进程将使用 MLMODELCNTK 数据进行分类。有一个选项可以使这些“短时运行”的操作更实际地“长时间运行”。

首先,我们运行并计时这个由在主线程上串行执行的简单循环重复调用的本地进程,并将结果(一个(ID,分类)元组)保存在列表中。接下来,我们使用 Parallel ForEach 循环运行并计时,将输出存储在 ConcurrentBag 中,我们最终按 ID 排序并存储在第二个列表中。最后,我们比较这两个列表以确保它们完全匹配。这当然是所有测试中最重要的一项。

从控制台输出中,我们可以看到串行和并行分类确实匹配,并且并发方法的速度是两倍以上。我们还可以注意到 ThreadLocal<T> 如预期般工作,仅包含一个条目,并且在串行处理和该特定并行处理运行中使用了单个模型副本,该并行处理运行可能在 4 个逻辑处理器上使用了 5 或 6 个克隆模型。(请注意,有一个选项可以重复运行并发进程以查看资源如何变化。如果打开 Diagnostic Tools,您会发现每次重新运行并发进程时,CPU 处理器资源几乎 100% 都被使用了。)

结论

从本次演示得出的主要结论是,CNTK 和其他使用各种机器学习技术训练的模型可以在生产环境中安全地并发运行,无论是直接调用还是深入嵌入到某个(否则为线程安全的)数据处理循环中。

历史

  • 2018 年 10 月 1 日:1.0 版
© . All rights reserved.