SqlDependency 与 Entity Framework 5.0






4.98/5 (28投票s)
本文将向您展示如何使用 Entity Framework 5.0 订阅数据更改。
引言
本文将向您展示如何使用 Entity Framework 5.0 订阅数据更改。您将能够使用此方法,在查询结果发生更改时通知您的应用程序,而无需持续轮询数据库。在此过程中,我们将学习一些关于反射的概念以及 Entity Framework 内部的一些知识。
背景
本文假设您熟悉 Entity Framework 以及如何创建基本的 Code-First 应用程序。它还假设您对 反射 以及 它在 .Net 框架中的工作方式 有所了解。
使用代码
在本文结束时,您应该能够使用以下方法,在名称为“Lamp”的产品被创建或删除时通知您的应用程序。基本上,只要查询结果发生变化,就会通知。
using (var notifer = new EntityChangeNotifier<Product, StoreDbContext>(p => p.Name == "Lamp")) { notifer.Changed += (sender, e) => { Console.WriteLine(e.Results.Count()); foreach (var p in e.Results) { Console.WriteLine(" {0}", p.Name); } }; Console.WriteLine("Press any key to stop..."); Console.ReadKey(true); }
警告
此代码使用了 Entity Framework 中未公开的部分,因此 EF 团队可能会在不通知的情况下更改这些部分,导致任何使用此方法的应用程序崩溃,甚至可能非常严重。在任何环境中使用此方法时,务必了解此风险。
SqlDependency
此方法的依据是 SqlDependency
类。此类的作用是表示应用程序与 SQL Server 实例之间的查询通知。此类的基本操作如下:首先,您需要通知 SQL Server 您将使用查询通知;然后我们将注册我们想要获取通知的查询;最后,我们通知 SQL Server 我们不再需要通知,以清理所有内容。注意:对于可以注册进行通知的查询类型有严格的规则。请参阅 MSDN 上 为通知创建查询 以获取这些规则的列表。
通知 SQL Server 我们将使用查询通知
string connectionString = ConfigurationManager.ConnectionStrings["StoreDbContext"].ConnectionString;
SqlDependency.Start(connectionString);
SqlDependency.Start
是用于通知 SQL Server 我们将使用查询通知的方法。这会在数据库上创建 Service Broker 队列,并启动当前 AppDomain 的监听器,以接收来自 SQL Server 的依赖项通知。
注册我们的查询
static void RegisterNotification() { string connectionString = ConfigurationManager.ConnectionStrings["StoreDbContext"].ConnectionString; string commandText = @" Select dbo.Products.Id, dbo.Products.Name, dbo.Products.LastEditDate From dbo.Products Where dbo.Products.Name = 'Lamp' "; using (SqlConnection connection = new SqlConnection(connectionString)) { using (SqlCommand command = new SqlCommand(commandText, connection)) { connection.Open(); var sqlDependency = new SqlDependency(command); sqlDependency.OnChange += new OnChangeEventHandler(sqlDependency_OnChange); // NOTE: You have to execute the command, or the notification will never fire. using (SqlDataReader reader = command.ExecuteReader()) { } } } }
我们的查询,在正常执行时,只会返回所有名称为“Lamp”的产品。然而,作为查询通知,它告诉 SQL Server,只要此查询的结果发生变化,我们都希望得到通知。这意味着:
- 任何时候插入新的名称为“Lamp”的产品;
- 任何时候名称为“Lamp”的现有产品将其名称更改为“Lamp”以外的名称;
- 任何时候名称为“Lamp”的现有产品被删除;
- 任何时候名称为“Lamp”的产品的 SELECT 语句中某个列的值发生变化。
查询通知不会在以下情况触发:
- 任何名称不等于“Lamp”的产品被修改、插入、删除;
- SELECT 语句中未指定的任何列被修改。
对通知作出反应
static void sqlDependency_OnChange(object sender, SqlNotificationEventArgs e) { Console.WriteLine("Info: {0}; Source: {1}; Type: {2}", e.Info, e.Source, e.Type); RegisterNotification(); }
只要查询结果发生变化或注册查询时发生错误,就会调用此方法。请注意,我们再次调用了 `RegisterNotification`。如果我们不这样做,我们将永远不会收到另一次更新。SQL Server 每次注册只发送一个通知。`SqlNotificationEventArgs` 包含几个有用的属性,用于调试问题和理解更改通知的性质。`Info` 是一个 `SqlNotificationInfo` 枚举。它告诉我们导致通知的原因,例如 `Insert`、`Update`、`Delete`、`Invalid` 等。`Source` 是一个 `SqlNotificationSource` 枚举。它告诉我们通知源是 `Data`、`Timeout`、`Database` 等。如果您的语句不是 为通知创建查询 中规定的有效通知语句,则此值将为 `Statement`。`Type` 是一个 `SqlNotificationType` 枚举。如果数据在服务器上被修改,则此值为 `Change`;如果创建通知订阅失败,则此值为 `Subscribe`。请注意,这不会包含数据库中任何数据的任何结果集。您需要自行重新获取数据。
我的 Entity Framework 在哪里?!?
到目前为止,我们只介绍了 SqlDependency
的基本实现,而没有说明如何使其与 Entity Framework 协同工作。正如您从上面的示例中看到的,我们只需将我们的 Entity Framework 表达式转换为一个 SqlCommand
,然后就可以将其注册到 SqlDependency
。那么,我们如何转换
db.Products.Where(x => x.Name == "Lamp")
into
Select
dbo.Products.Id,
dbo.Products.Name,
dbo.Products.LastEditDate
From
dbo.Products
Where
dbo.Products.Name = 'Lamp'
当我们执行 `db.Products.Where(x => x.Name == "Lamp")` 这样的操作时,我们实际上可以将其转换为 `DbQuery
var db = new StoreDbContext(); var query = db.Products.Where(x => x.Name == "Lamp") as DbQuery<Product>; Console.WriteLine(query.ToString());
这实际上返回以下内容
SELECT
[Extent1].[Id] AS [Id],
[Extent1].[Name] AS [Name],
[Extent1].[LastEditDate] AS [LastEditDate]
FROM [dbo].[Products] AS [Extent1]
WHERE N'Lamp' = [Extent1].[Name]
此查询符合 为通知创建查询 文档中指定的所有准则,因此它会运行良好。这也意味着我们可以将 RegisterNotification 方法更改为以下内容
static void RegisterNotification() { string connectionString = ConfigurationManager.ConnectionStrings["StoreDbContext"].ConnectionString; string commandText = null; using (var db = new StoreDbContext()) { var query = db.Products.Where(x => x.Name == "Lamp") as DbQuery<Product>; commandText = query.ToString(); } using (SqlConnection connection = new SqlConnection(connectionString)) { using (SqlCommand command = new SqlCommand(commandText, connection)) { connection.Open(); var sqlDependency = new SqlDependency(command); sqlDependency.OnChange += new OnChangeEventHandler(sqlDependency_OnChange); // NOTE: You have to execute the command, or the notification will never fire. using (SqlDataReader reader = command.ExecuteReader()) { } } } }
现在请注意,我们不再为 SQL 命令硬编码字符串。相反,我们使用 `DbContext` 对象上的 lambda 表达式 `x => x.Name == "Lamp"` 为我们生成命令。但是,如果我们想监视除灯以外的产品怎么办?让我们更改 `RegisterNotification` 方法以接受产品名称参数。
static void RegisterNotification(string productName) { string connectionString = ConfigurationManager.ConnectionStrings["StoreDbContext"].ConnectionString; string commandText = null; using (var db = new StoreDbContext()) { var query = db.Products.Where(x => x.Name == productName) as DbQuery<Product>; commandText = query.ToString(); } using (SqlConnection connection = new SqlConnection(connectionString)) { using (SqlCommand command = new SqlCommand(commandText, connection)) { connection.Open(); var sqlDependency = new SqlDependency(command); sqlDependency.OnChange += new OnChangeEventHandler(sqlDependency_OnChange); // NOTE: You have to execute the command, or the notification will never fire. using (SqlDataReader reader = command.ExecuteReader()) { } } } }
哦,不。现在当我们运行示例时,我们收到了一个错误!
Unhandled Exception: System.Data.SqlClient.SqlException: Must declare the scalar variable "@p__linq__0".
这是怎么回事?原来,Entity Framework 足够智能,知道当您在表达式中指定硬编码字符串时,例如 `x => x.Name == "Lamp"`,该值永远不会改变,因此它会生成一个硬编码字符串的查询。但是,当您指定变量时,例如 `x => x.Name == productName`,它会通过创建参数化查询来利用 SQL 执行计划缓存,这样即使 `productName` 的值发生变化,查询计划也可以重复使用。那么,让我们来看看现在从 `DbQuery
SELECT
[Extent1].[Id] AS [Id],
[Extent1].[Name] AS [Name],
[Extent1].[LastEditDate] AS [LastEditDate]
FROM [dbo].[Products] AS [Extent1]
WHERE [Extent1].[Name] = @p__linq__0
正如我们所料,它现在使用的是参数化查询而不是硬编码值。不幸的是,`DbQuery
不幸的是,这就是警告出现的地方。以下内容 Entity Framework 不以任何方式支持,并且可能随时中断。
事实证明,在内部,`DbQuery
首先,我们获取 `InternalDbQuery
var internalQuery = query.GetType() .GetFields(BindingFlags.NonPublic | BindingFlags.Instance) .Where(field => field.Name == "_internalQuery") .Select(field => field.GetValue(query)) .First();
然后是 `ObjectQuery
var objectQuery = internalQuery.GetType() .GetFields(BindingFlags.NonPublic | BindingFlags.Instance) .Where(field => field.Name == "_objectQuery") .Select(field => field.GetValue(internalQuery)) .Cast<ObjectQuery<T>>() .First();
使用这个 `ObjectQuery` 实例,我们就可以遍历参数,将它们添加到 `SqlCommand` 中。所以现在我们的 `RegisterNotification` 方法看起来像这样:
static void RegisterNotification(string productName) { string connectionString = ConfigurationManager.ConnectionStrings["StoreDbContext"].ConnectionString; string commandText = null; var parameters = new SqlParameter[0]; using (var db = new StoreDbContext()) { var query = db.Products.Where(x => x.Name == productName) as DbQuery<Product>; commandText = query.ToString(); var internalQuery = query.GetType() .GetFields(BindingFlags.NonPublic | BindingFlags.Instance) .Where(field => field.Name == "_internalQuery") .Select(field => field.GetValue(query)) .First(); var objectQuery = internalQuery.GetType() .GetFields(BindingFlags.NonPublic | BindingFlags.Instance) .Where(field => field.Name == "_objectQuery") .Select(field => field.GetValue(internalQuery)) .Cast<ObjectQuery<Product>>() .First(); parameters = objectQuery.Parameters.Select(x => new SqlParameter(x.Name, x.Value)).ToArray(); } using (SqlConnection connection = new SqlConnection(connectionString)) { using (SqlCommand command = new SqlCommand(commandText, connection)) { connection.Open(); command.Parameters.AddRange(parameters); var sqlDependency = new SqlDependency(command); sqlDependency.OnChange += new OnChangeEventHandler(sqlDependency_OnChange); // NOTE: You have to execute the command, or the notification will never fire. using (SqlDataReader reader = command.ExecuteReader()) { } } } }
现在我们运行应用程序,一切都按计划进行。但是,我认为我们可以清理代码,使其更通用,并允许我们将其用于其他查询。让我们首先为 `DbQuery
public static class DbQueryExtension { public static ObjectQuery<T> ToObjectQuery<T>(this DbQuery<T> query) { var internalQuery = query.GetType() .GetFields(BindingFlags.NonPublic | BindingFlags.Instance) .Where(field => field.Name == "_internalQuery") .Select(field => field.GetValue(query)) .First(); var objectQuery = internalQuery.GetType() .GetFields(BindingFlags.NonPublic | BindingFlags.Instance) .Where(field => field.Name == "_objectQuery") .Select(field => field.GetValue(internalQuery)) .Cast<ObjectQuery<T>>() .First(); return objectQuery; } }
现在,我们 `RegisterNotification` 方法的相关部分可以变成以下内容:
using (var db = new StoreDbContext()) { var query = db.Products.Where(x => x.Name == productName) as DbQuery<Product>; commandText = query.ToString(); var objectQuery = query.ToObjectQuery(); parameters = objectQuery.Parameters.Select(x => new SqlParameter(x.Name, x.Value)).ToArray(); }
但我认为我们现在可以更进一步,创建一个新的扩展方法,将 `DbQuery
public static SqlCommand ToSqlCommand<T>(this DbQuery<T> query) { SqlCommand command = new SqlCommand(); command.CommandText = query.ToString(); var objectQuery = query.ToObjectQuery(); foreach (var param in objectQuery.Parameters) { command.Parameters.AddWithValue(param.Name, param.Value); } return command; }
这会将我们的 `RegisterNotification` 方法转换为
static void RegisterNotification(string productName) { string connectionString = ConfigurationManager.ConnectionStrings["StoreDbContext"].ConnectionString; SqlCommand command; using (var db = new StoreDbContext()) { var query = db.Products.Where(x => x.Name == productName) as DbQuery<Product>; command = query.ToSqlCommand(); } using (SqlConnection connection = new SqlConnection(connectionString)) { using (command) { command.Connection = connection; connection.Open(); var sqlDependency = new SqlDependency(command); sqlDependency.OnChange += new OnChangeEventHandler(sqlDependency_OnChange); // NOTE: You have to execute the command, or the notification will never fire. using (SqlDataReader reader = command.ExecuteReader()) { } } } }
这很棒,但我们仍然在 `DbContext` 上对 Products `DbSet` 进行硬编码查询。为了解决这个问题,让我们创建一个新的泛型类,我们可以用它来注册查询以获取通知。
EntityChangeNotifier
所以,这是我们将上述所有内容移入自己的类的第一步
public class EntityChangeNotifier<TEntity, TDbContext> : IDisposable where TEntity : class where TDbContext : DbContext, new() { private DbContext _context; private Expression<Func<TEntity, bool>> _query; private string _connectionString; public EntityChangeNotifier(Expression<Func<TEntity, bool>> query) { _context = new TDbContext(); _query = query; _connectionString = _context.Database.Connection.ConnectionString; SqlDependency.Start(_connectionString); RegisterNotification(); } private void RegisterNotification() { _context = new TDbContext(); using (SqlConnection connection = new SqlConnection(_connectionString)) { using (SqlCommand command = GetCommand()) { command.Connection = connection; connection.Open(); var sqlDependency = new SqlDependency(command); sqlDependency.OnChange += new OnChangeEventHandler(_sqlDependency_OnChange); // NOTE: You have to execute the command, or the notification will never fire. using (SqlDataReader reader = command.ExecuteReader()) { } } } } private SqlCommand GetCommand() { var q = GetCurrent(); return q.ToSqlCommand(); } private DbQuery<TEntity> GetCurrent() { var query = _context.Set<TEntity>().Where(_query) as DbQuery<TEntity>; return query; } private void _sqlDependency_OnChange(object sender, SqlNotificationEventArgs e) { Console.WriteLine("Info: {0}; Source: {1}; Type: {2}", e.Info, e.Source, e.Type); RegisterNotification(); } public void Dispose() { SqlDependency.Stop(_connectionString); } }
如您所见,我们的 `EntityChangeNotifier` 类接受两个泛型参数,第一个是实体类型,第二个是 `DbContext` 类型。我们还使用 `IDisposable` 允许我们使用 `using` 语法来启动和停止连接字符串的 `SqlDependency`。这允许我们编写以下代码
using (var notifer = new EntityChangeNotifier<Product, StoreDbContext>(x => x.Name == "Lamp")) { Console.WriteLine("Press any key to stop listening for changes..."); Console.ReadKey(true); }
这很棒,但现在我们需要处理更改通知发生时触发的事件。让我们在 `EntityChangeNotifier` 类上创建一些我们自己的事件。首先,是 `EventArgs` 类
public class EntityChangeEventArgs<T> : EventArgs { public IEnumerable<T> Results { get; set; } public bool ContinueListening { get; set; } } public class NotifierErrorEventArgs : EventArgs { public string Sql { get; set; } public SqlNotificationEventArgs Reason { get; set; } }
然后是我们的事件声明
public event EventHandler<EntityChangeEventArgs<TEntity>> Changed; public event EventHandler<NotifierErrorEventArgs> Error;
然后是我们的 `OnEvent` 方法
protected virtual void OnChanged(EntityChangeEventArgs<TEntity> e) { if (Changed != null) { Changed(this, e); } } protected virtual void OnError(NotifierErrorEventArgs e) { if (Error != null) { Error(this, e); } }
现在我们可以将 `SqlDependency` 事件处理程序更改为
private void _sqlDependency_OnChange(object sender, SqlNotificationEventArgs e) { if (e.Type == SqlNotificationType.Subscribe || e.Info == SqlNotificationInfo.Error) { var args = new NotifierErrorEventArgs { Reason = e, Sql = GetCurrent().ToString() }; OnError(args); } else { var args = new EntityChangeEventArgs<TEntity> { Results = GetCurrent(), ContinueListening = true }; OnChanged(args); if (args.ContinueListening) { RegisterNotification(); } } }
现在我们正在检查事件是否因错误或更改而触发。如果发生错误,我们会触发我们自己的 `Error` 事件。如果发生更改,我们会触发我们的 `Changed` 事件。我们甚至在 `EventArgs` 类中包含了查询的 `IEnumerable
using (var notifer = new EntityChangeNotifier<Product, StoreDbContext>(x => x.Name == "Lamp")) { notifer.Changed += (sender, e) => { Console.WriteLine(e.Results.Count()); }; Console.WriteLine("Press any key to stop listening for changes..."); Console.ReadKey(true); }
万岁!看起来一切都很顺利!嗯……几乎是。让我们看看当我们执行以下操作时会发生什么
using (var notifer = new EntityChangeNotifier<Product, StoreDbContext>(x => x.Name == "Lamp")) { notifer.Changed += (sender, e) => { Console.WriteLine(e.Results.Count()); }; using (var otherNotifier = new EntityChangeNotifier<Product, StoreDbContext>(x => x.Name == "Desk")) { otherNotifier.Changed += (sender, e) => { Console.WriteLine(e.Results.Count()); }; Console.WriteLine("Press any key to stop listening for changes..."); Console.ReadKey(true); } Console.WriteLine("Press any key to stop listening for changes..."); Console.ReadKey(true); }
如果您现在运行应用程序,您应该会看到每当您在数据库中对某种产品进行更改时,它都会打印出该产品类型的总数。请直接在数据库上运行一些查询,例如
INSERT INTO Products (Name, LastEditDate) Values ('Lamp', GetDate())
INSERT INTO Products (Name, LastEditDate) Values ('Desk', GetDate())
现在按下任意键关闭“Desk”通知器的监控。现在再次运行 desk 的插入语句。发生了什么?我们仍然收到了通知!这告诉我们一些重要的事情。具体来说,在内部,`SqlDependency` 会跟踪所有调用 `Start` 方法的地方(可能只是一个计数器),并且只有当 `Stop` 被调用了相同次数时才会停止监听。我们可以通过在 `EntityChangeNotification` 类本身内部维护和检查状态来解决这个问题。让我们更改 `Dispose` 方法,不仅调用 `SqlDependency.Stop` 方法,还处置 `DbContext` 实例。
public void Dispose() { SqlDependency.Stop(_connectionString); if (_context != null) { _context.Dispose(); _context = null; } }
现在我们可以在 `SqlDependency` 事件处理程序中检查 `_context` 是否为 `null`,如果是,则直接返回。
private void _sqlDependency_OnChange(object sender, SqlNotificationEventArgs e) { if (_context == null) return; . . . }
现在,让我们再试一次。在关闭“Desk”通知器后,当“Desk”行发生更改时,我们不再收到通知。由于我们没有重新注册通知,所以它只发生一次。
关注点
这种方法的好处是,当数据库中的数据发生更改时,我们可以即时收到应用程序的通知,而无需每隔 X 秒轮询数据库。它不适用于所有查询,并且 SQL Server 团队对哪些查询可以触发通知施加的限制是严格的,但当可以使用时,它为您的应用程序响应数据更改提供了一种比仅仅依赖轮询更好的方式。您还应该阅读 MSDN 关于 SqlDependency 类的文档,包括关于其预期用途的说明。特别是关于其意图的说明(我的强调):
...旨在用于 ASP.NET 或中间层服务,其中有相对少量服务器对数据库具有活动依赖项。它不适用于客户端应用程序,其中数百或数千台客户端计算机将为单个数据库服务器设置 `SqlDependency` 对象。如果您正在开发一个需要数据更改时可靠的亚秒级通知的应用程序,请查阅 SQL Server 联机丛书中的《规划高效查询通知策略》和《查询通知的替代方案》部分。
在决定 SqlDependency 是否适合您的应用程序时,请务必牢记这些注意事项。
源代码
所有源代码都可以在 GitHub 此处 获取。
历史
2012 年 11 月 - 首次发布