65.9K
CodeProject 正在变化。 阅读更多。
Home

可取消的 SQL Server 查询

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.50/5 (3投票s)

2012年7月23日

CPOL

8分钟阅读

viewsIcon

36917

downloadIcon

519

.net 3.5 可取消的 SQL Server 线程化查询

引言

SQL Server Management Studio 提供了大部分日常管理 SQL Server 实例所需的基本 GUI 功能,非常方便。但是,其中也缺少一些功能,例如在存储过程中搜索文本或将对象排序到自定义文件夹中。

由于这些不足以及其他一些超出本文范围的问题,我开始创建 SSMS 的克隆,既作为学习路径,也为了添加一些缺失的功能。

我遇到的第一个问题是 SSMS 提供了异步运行查询并随时取消它们的功能,这可以通过在 SSMS 中运行查询或展开对象资源管理器中的节点来观察。

所以,我需要做的第一件事就是编写一些代码,该代码可以异步运行查询到 SQL Server,并提供取消查询的功能。

使用代码

下面的代码可用于将数据加载到标准的 .NET DataGridView 控件中。

public void RunQuery()
{
    using (CancellableQuery Query = new CancellableQuery { 
      ConnectionString = MyConnectionString, SQL = MySqlString, 
      ReturnDataInChunks = true, QueryChunkSize = 500, ClearObjectsOnChunkCompleted = false })
    {
        Query.GetColumns += (sender, args) =>
        {
            foreach(QueryColumn Column in args.Columns)
            {
                if (gvMain.InvokeRequired)
                {
                    MethodInvoker Invoker = () =>
                    {
                        gvMain.Columns.Add(Column.HeaderText, Column.HeaderText);
                    };
                    gvMain.Invoke(Invoker);
                }
                else
                    gvMain.Columns.Add(Column.HeaderText, Column.HeaderText);                        
            }
        };
        Query.QueryCompleted += (sender, args) =>
        {
            foreach (Object[] Values in args.Results)
            {
                if (gvMain.InvokeRequired)
                {
                    MethodInvoker Invoker = () =>
                        {
                            gvMain.Rows.Add(Values);
                        };
                    gvMain.Invoke(Invoker);
                }
                else
                {
                    gvMain.Rows.Add(Values);
                }
            }
            lblQueryStatus.Text = "Query Completed";
            btnStopQuery.Enabled = false;
            btnRunQuery.Enabled = true;
        };
        Query.QueryChunkCompleted += (sender, args) =>
        {
            foreach (Object[] Values in args.Results)
            {
                if (gvMain.InvokeRequired)
                {
                    MethodInvoker Invoker = () =>
                    {
                        gvMain.Rows.Add(Values);
                    };
                    gvMain.Invoke(Invoker);
                }
                else
                {
                    gvMain.Rows.Add(Values);
                }
            }
            lblRowsReturned.Text = string.Format("{0} rows returned", gvMain.Rows.Count);
        };                
        Query.OnQueryError += (sender, args) =>
        {
            MessageBox.Show(args.ex.ToString());
            lblQueryStatus.Text = "Query Error";
        };
        Query.StartQueryExecution();
    }
}

从上面的代码可以看出,您需要在 CancellableQuery 类中至少订阅四个事件,以确保所有内容都已正确连接。

CancellableQuery 类

CancellableQuery 类是本文的主要重点。它有三个主要目标:

  • 查询应随时可取消。
  • 如果需要,查询应分块返回其结果。
  • 它应该非常易于使用。

那么我们从哪里开始呢?嗯,您首先会注意到,没有使用特殊的自定义编写的集合或线程处理例程,所有使用的对象都是通用的 .NET 对象。

此类中提供了以下公共方法:

方法名称 描述

StartQueryExecution

使用 ConnectionString 属性中指定的 SQL 文本和数据源,在后台工作线程中开始执行查询。
CancelQuery 取消当前正在运行的查询,并在已订阅的情况下触发 QueryCanelled 事件。

此类中提供了以下公共属性:

属性名称 描述
ConnectionString 获取或设置 ConnectionString 属性,该属性标识要连接的 SQL Server 实例。
SQL 获取或设置要在服务器上运行的 SQL SELECT 命令。
ReturnDataInChunks 获取或设置一个布尔值,该值指示是否通过 QueryChunkCompleted 事件分块返回查询结果。
QueryChunkSize 获取或设置当 ReturnDataInChunks 属性设置为 true 时,用于标识数据应返回的块大小的值。
IsRunning 获取一个布尔值,指示查询是否正在运行。
ClearObjectsOnChunkCompleted

获取或设置一个布尔值,该值指示在事件处理完成后,QueryChunkCompleted 返回的 List<Object> 参数是清空还是创建了一个新列表。

类中提供了以下公共事件:

事件名称 描述
GetColumns 在底层 SQLDataReader 完成其 ExecuteReader 方法并读取了查询返回的底层架构后,即会触发此事件。
QueryChunkCompleted 当读取的行数等于 QueryChunkSize 属性时,将触发此事件。
QueryCompleted 查询完成并已从 SQL Server 实例读取所有数据后,将触发此事件。
QueryCancelled 调用 CancelQuery() 方法时触发此事件。
OnQueryError 查询因任何原因失败时触发此事件。

CancellableQuery 类主要的处理功能在 StartQueryExecution 方法中完成,我们将更详细地研究它。

public void StartQueryExecution()
{
    // Before we do anything else it is best to check if the ConnectionString and SQL properties have been set.
    if (ConnectionString == null || ConnectionString == string.Empty)
        throw new Exception("ConnectionString property has not been set");
    if (SQL == null || SQL == string.Empty)
        throw new Exception("SQL property has not been set");
    
    // set the IsRunning variable to true as we are affectively now in execution mode or running.
    isRunning = true;
   
    // Create an SqlDataReader object, if you dont want to connect
    // to sql server then i see no reason why this cannot be changed to an
    // OleDBDataReader object
    SqlDataReader reader = null;
    // Create an SqlConnection object, if you dont want to connect
    // to sql server then i see no reason why this cannot be changed to an
    // OleDBDConnection object            
    SqlConnection Connection = null;
    // Create an SqlCommand object, if you dont want to connect
    // to sql server then i see no reason why this cannot be changed to an
    // OleDbCommand object            
    SqlCommand Command = null;
    // List of objects that will store the physical field values 
    List<object[]> Results = new List<object[]>();
    // A collection of columns within the dataset, unfortunately
    // the DataColumnCollection does not really suit our needs, it has no
    // constructor and inheriting from InternalDataCollectionBase is a bit of an over kill.
    QueryColumnCollection Columns = new QueryColumnCollection();
            
    try
    {
        // Create the BackgroundWorker object and set the
        // WorkerSupportsCancellation Property to true so that we can cancel the worker when the
        // CancelQuery Method is called.
        Worker = new BackgroundWorker() { WorkerSupportsCancellation = true };
        // Assigm our method stub to the DoWork event of the BackgroundWorker using a Lambada Expression.  
        // Some people like these, some don't, if you dont like them or you
        // are using an older version of the .net framework, UnComment the RunQuery
        /// method below and comment out this method.
        Worker.DoWork += (s, e) =>
        {
            try
            {
                // Create the SqlConnection object assigning the ConnectionString
                // of this class to the ConnectionString of the SQLConnection object.
                Connection = new SqlConnection(ConnectionString);
                // Open the connection string object.  We do this right at the
                // top here so that if an error occurs, the rest of the code is not run, saves
                // a little bit of work on the side of the software.
                Connection.Open();
                // Create the Command object and assign the SQL property
                // from this class to the CommandText Property of the SqlCommand object
                Command = new SqlCommand(SQL);
                // Assign the SqlConnection object to the command
                Command.Connection = Connection;
                // Execute the reader object
                reader = Command.ExecuteReader();
                // First thing we do is get the Columns returned by the underlying
                // query, this enables us to instantly populate our control with the header
                // columns from the Query
                // The GetSchemaTable returns the column meta data from the underlying
                // query which can be used to populate a grid with headers or populate
                // a datatable object with headers.
                using (DataTable dtSchema = reader.GetSchemaTable())
                {
                    if (dtSchema != null)
                    {
                        foreach (DataRow drow in dtSchema.Rows)
                        {
                            QueryColumn MyColumn = new QueryColumn(
                              Convert.ToString(drow["ColumnName"]), (Type)(drow["DataType"]));
                            Columns.AddColumn(MyColumn);
                        }
                    }
                    
                    if (GetColumns != null)
                        GetColumns(this, new GetColumnsEventArgs(Columns));
                }
                // create a local variable that keeps track of the CurrentRowCount,
                // so that we can check if the QueryChunkSize has been reached.
                int CurrentRowCount = 0;
                // Loop through the datareader rows
                while (reader.Read())
                {
                    // Check if the query has been cancelled using the CancelQuery method
                    if (Cancelled)
                    {
                        // The query has been cancelled now fire the QueryCancelled Event if it has been attached to.
                        if (QueryCancelled != null)
                            QueryCancelled(this, EventArgs.Empty);
                        // set the internal isRunning variable to false
                        isRunning = false; 
                        
                        // jump out of the while loop, we no longer need to process any further rows.                                
                        break;
                    }
                    // create an Array of Objects to store the values that are reader from the SqlDataReader
                    Object[] values = new Object[reader.FieldCount];
                    // Read the values intot he array
                    reader.GetValues(values);
                    // Add the values to our results list.
                    Results.Add(values);
                    // Incremement the CurrentRowCount variable
                    CurrentRowCount++;
                    // Check if the ReturnDataInChunks property is set to
                    // true and if the CurrentRowcount equals the QueryChunkSize
                    
                    if (ReturnDataInChunks && CurrentRowCount == QueryChunkSize)
                    {                            
                        // The CurrentRowCount equals the QueryChunkSize and
                        // the ReturnDataInChunks property is set to true.
                        // return the Object List back to the calling method
                        // if the QueryChunkCompleted event has been subscribed to.
                        if (QueryChunkCompleted != null)
                            QueryChunkCompleted(this, new QueryCompletedEventArgs(Results));
                        // reset the CurrentRowCount variable 
                        CurrentRowCount = 0;
                        // Clear out the Object List as we dont need this
                        // internally any more as the calling code should of used them
                        // as required or cloned them if they dont need them.
                        Results.Clear();                                
                    }
                }                        
            }
            catch (Exception ex)
            {
                // An exception has occoured somewhere, so raise
                // the OnQueryError event if it has been subscribed to.
                isRunning = false;
                if (OnQueryError != null)
                    OnQueryError(this, new QueryErrorDelegate(ex));
                
                // Set the isRunning varaible to false;
                
            }
        };
        Worker.RunWorkerCompleted += (s, e) =>
        {
            // The query has completed fine, no errors have been
            // reported so raise the QueryCompleted Event  if it has been subscribed to.
            // this will also return any remaining results as the
            // QueryChunkCompleted Event will not be invoked if the CurrentRowCount does
            // not equal the QueryChunkSize
            if (QueryCompleted != null)                                                                    
                QueryCompleted(this, new QueryCompletedEventArgs(Results));                        
            // Set the isRunning variable to false as we are
            // theoretically no longer running any form of query operation.
            isRunning = false;
        };
        // Run the background worker.
        Worker.RunWorkerAsync();
    }
    catch (Exception ex)
    {
        // An exception has occoured somewhere, so raise
        // the OnQueryError event if it has been subscribed to.
        if (OnQueryError != null)
            OnQueryError(this, new QueryErrorDelegate(ex));
        // Set the isRunning varaible to false;
        isRunning = false;                
    }
    finally
    {
        // Do all the clean up required
        if (Connection != null)
        {
            Connection.Close();
            Connection.Dispose();
        }
        if (reader != null)
        {
            reader.Close();
            reader.Dispose();
        }
    }
}

您首先会注意到的是,代码使用的是 BackgroundWorker 对象,而不是 Microsoft 提供的任何异步方法,例如 BeginExcuteReader()EndExecuteReader()。这有两个原因……

  • 首先,也是最重要的,.net 中内置的异步数据访问功能会给您的应用程序增加相当大的开销,并且比不使用异步访问慢得多。对于少量数据来说,这通常不是问题,但是当您查询数百万条记录时,通过不使用内置的异步功能,可以将时间(尤其是在慢速服务器上)缩短几分钟。
  • 其次,使用 BeginExecuteReader()EndExecuteReader() 功能唯一真正可行的方法是通过回调,这会带来全新的线程问题,特别是更新 GUI 时,如果您还记得上面的内容,我想保持简单。

获取查询将返回的列的标题

首先,为了在 DataGridView 中显示数据,我们需要为其提供显示数据所需的列。

这是通过 SqlDataReader.GetSchemaTable() 方法完成的。

using (DataTable dtSchema = reader.GetSchemaTable())
{
    if (dtSchema != null)
    {
       foreach (DataRow drow in dtSchema.Rows)
       {
           QueryColumn MyColumn = new QueryColumn(Convert.ToString(drow["ColumnName"]), 
                                                 (Type)(drow["DataType"]));
           Columns.AddColumn(MyColumn);
       }
    }
                            
    if (GetColumns != null)
        GetColumns(this, new GetColumnsEventArgs(Columns));
}

这让我感到非常恼火,我讨厌重复造轮子,真的很想使用 DataColumnCollection,但不幸的是,它没有指定构造函数,更糟糕的是,它继承自 InternalDataCollectionBase,这是另一个类,它会引入相当大的开销,不太适合我们当前的任务,因此创建了一个简单的 QueryColumnQueryCollumnCollection 类。

数据块

SSMS 让我印象深刻的一个主要功能是每次返回 10,000 行数据。

这正是 ReturnDataInChunksQueryChunkSize 属性发挥作用的地方。将 ReturnDataInChunks 设置为 true,并将 QueryChunkSize 设置为一个相当大的整数值(例如 1000),将允许 CancellableQueryQueryChunkCompleted 事件中将数据分块返回给调用线程。

CurrentRowCount++;
// Check if the ReturnDataInChunks property is set to true and if the CurrentRowcount equals the QueryChunkSize

if (ReturnDataInChunks && CurrentRowCount == QueryChunkSize)
{

从上面可以看出,代码非常简单,我们所做的就是使用 CurrentRowCount 变量来计算当前行数。当达到 QueryChunkSize 时,我们触发 QueryChunkCompleted 事件,将存储在 Results 变量中的值列表(这是一个简单的 List<object>(对象列表))传回。根据 ClearObjectsOnChunkCompleted 属性的值,在事件触发后(ClearObjectsOnChunkCompleted = true)将列表清空,或者创建一个新的 List<Object>ClearObjectsOnChunkCompleted = false)。

代码真的就是这么简单。

演示代码

那么让我们更仔细地看看上面的代码以及我们在做什么……

代码的第一部分创建 CancellableQuery 对象并为其分配各种属性。

获取列列表

接下来的几行处理将列分配给我们要使用的 DataViewGrid 控件。我将我的命名为 gvMain

Query.GetColumns += (sender, args) =>
{
   foreach(QueryColumn Column in args.Columns)
   {
      if (gvMain.InvokeRequired)
      {
         MethodInvoker Invoker = () =>
         {
              gvMain.Columns.Add(Column.HeaderText, Column.HeaderText);
         };
         gvMain.Invoke(Invoker);
      }
      else gvMain.Columns.Add(Column.HeaderText, Column.HeaderText); 
    }
};

GetColumnsEvent 的类型是 EventsAndDelegates.cs 中的 QueryGetColumnsEventDelegate,它允许我们访问从查询中检索到的底层列。args.ColumnsQueryColumns 的集合,通过 HeaderText 和列的数据类型(通过 ColumnType 属性)可以访问列标题。

读取数据

获取列列表并将其添加到网格后,我们需要将数据读入网格,这可以通过附加到 QueryChunkCompleted 事件和 QueryCompleted 事件来完成。

Query.QueryChunkCompleted += (sender, args) =>
{
    foreach (Object[] Values in args.Results)
    {
       if (gvMain.InvokeRequired)
       {
          MethodInvoker Invoker = () =>
          {
             gvMain.Rows.Add(Values);
          };
          gvMain.Invoke(Invoker);
       }
       else
       {
          gvMain.Rows.Add(Values);
       }
     }
     lblRowsReturned.Text = string.Format("{0} rows returned", gvMain.Rows.Count);
};

每次读取的行数等于 QueryChunkSize 属性的值时,都会触发此事件,前提是 ReturnDataInChunks 属性设置为 true。

QueryChunkCompleted 事件也是一个自定义委托,类型为 QueryChunkCompletedEventDelegate,它通过 args.Results 参数为我们提供已读取的每一行的值列表。

我最初打算将 DataTable 用作 args.Results 属性的类型,但是与对象列表相比,它的开销太大,尤其考虑到理论上您可以查询数百万条记录。

DataGridView 为我们提供了一个非常方便的函数,可以通过 Rows.Add() 方法添加对象值列表作为行。

查询完成

查询成功完成后,我们需要响应 QueryCompleted 事件。

Query.QueryCompleted += (sender, args) =>
{
    foreach (Object[] Values in args.Results)
    {
       if (gvMain.InvokeRequired)
       {
           MethodInvoker Invoker = () =>
           {
               gvMain.Rows.Add(Values);
           };
           gvMain.Invoke(Invoker);
       }
       else
       {
           gvMain.Rows.Add(Values);
       }
    }
    lblQueryStatus.Text = "Query Completed";
    btnStopQuery.Enabled = false;
    btnRunQuery.Enabled = true;
};

如果我们没有通过 ReturnDataInChunks 属性设置启用数据分块返回,则 args 参数将在 args.Results 属性中包含所有数据。如果我们已将 ReturnDataInChunks 属性设置为 true,则 args.Results 属性将包含未达到 QueryChunkSize 限制的剩余记录。

这也是一个自定义事件委托,类型为 QueryCompletedEventDelegate,位于 EventsAndDelegates.cs 中。

查询错误

如果在执行底层查询时发生错误,我们需要附加到 OnQueryError 事件,该事件通过 args.ex 属性为我们提供了响应底层 Exception 对象的能力。

Query.OnQueryError += (sender, args) =>
{
   MessageBox.Show(args.ex.ToString());
   lblQueryStatus.Text = "Query Error";
};

当触发此事件时,IsRunning 属性将被设置为 false。

最后 - 运行或取消查询

设置完所有必需的事件后,只需调用 StartQueryExecution() 方法即可调用查询。

要取消查询,请调用 CancelQuery() 方法,该方法将触发 QueryCancelled 事件。

总结

这是我为 CodeProject 撰写的第一个文章,所以首先,我乐于接受所有评论,无论好坏,并在有人指出我的明显疏忽时相应地更新文章。

实际上,所有针对数据源(无论是 SQL Server 还是其他)的查询都应该运行在自己的线程中,以确保主 GUI 保持响应。但是,能够取消它们取决于您的业务需求。

在我的下一篇文章中,我计划介绍我构建的 Microsoft Access \ SSMS 风格的查询生成器。

历史

  • 2012 年 7 月 23 日:首次发布到 CodeProject.com。
  • 2012 年 7 月 23 日:根据 Mika Wendelius 的一些非常有效的评论更新了文章。
  • 2012 年 7 月 24 日:添加了 ClearObjectsOnChunkCompleted 属性,并确保在取消查询时关闭了 SqlDataReaderSqlConnection 对象。
© . All rights reserved.