SQLoogle - 第 1 部分,共 2 部分





5.00/5 (22投票s)
用 Google 搜索您的 SQL。
目录
引言
有一天,我突然想到,我应该能够在我公司全部 SQL 目录中进行毫秒级的搜索。我需要一个 SQL 版的 Google。类似这样的东西。
SQLoogle 抓取、索引并搜索您的 T-SQL 对象。以下是它的一些功能:
- 以闪电般的速度抓取几乎所有的 SQL 数据库对象(例如表、索引、存储过程等)。
- 抓取 SQL Agent 作业步骤。
- 抓取 Reporting Services 命令。
- 抓取即席查询(已过滤和压缩)。
- 抓取准备好的语句。
- 捕获使用统计信息(执行次数、查找次数、扫描次数和最后使用日期)。
- 使用 Lucene.NET 执行多字段搜索。
- 提供基于 Web 的单页 AJAX 搜索界面。
- 提供 SQL 定义的下载、格式化查看和比较。
如果您和我一样,每天都会查找 SQL 对象。当处理许多 SQL Server 和数据库时,这可能很麻烦。大多数人使用 SQL Server Management Studio 来完成此操作。但是,这意味着大量的点击、滚动、右键单击、腕管综合征等。
导航(或浏览)查找某物有其存在的意义。但是,当需要浏览的内容很多时,搜索功能会非常有帮助。尤其是在您赶时间,或者不确定从哪里开始查找时(对我来说,几乎总是这样)。给几个词,搜索引擎就能立即返回结果。如今,我们对此能力习以为常。因此,SQLoogle 试图提供这种能力。
这是解释 SQLoogle 如何工作的系列文章的第一篇。本文介绍了后端。后端主要是提取、转换和加载(ETL)。第二部分介绍了前端,包括使用 ASP.Net MVC 3 开发的 Web 服务和网站,以及由 jQuery 和 Knockout.js 驱动的 AJAX 搜索界面。以下是 SQLoogle 的基本计划:
- 从 SQL Server 提取 SQL。
- 转换和聚合 SQL。
- 将 SQL 加载到 Lucene.NET。
- 搜索 SQL!
这是我使用 draw.io 制作的图表。
配置
SQLoogle 的配置使用了 .NET 2.0 的配置 API。在 4GuysFromRolla 网站上有一篇关于此的精彩文章。以下是 App.Config 中 sqloogleBot 部分的示例:
<sqloogleBot searchIndexPath="c:\Sqloogle\SearchIndex\"
<servers>
<add name="localhost"
connectionString="Server=localhost;Database=master;Trusted_Connection=True;" />
</servers>
<skips>
<add name="master"></add>
<add name="model"></add>
<add name="tempdb"></add>
<add name="msdb"></add>
<add name="SharePoint_" operator="StartsWith"></add>
</skips>
</sqloogleBot>
有两个集合:servers
和 skips
。在 servers 元素中,为要抓取的每个服务器添加名称和连接字符串。我上面只有一个,但您可以根据需要添加任意数量。在 skips 元素中,添加您不关心的任何数据库。也许您不需要这个,但我发现有些数据库对象我并不关心。为了方便起见,skip 元素有一个 operator 属性,支持 equals
、startsWith
和 endsWith
。这允许您用一个条目筛选出多个数据库。因此,以上面的示例为例,SQLoogle 抓取本地 SQL Server,并跳过以下数据库:master、model、tempdb、msdb,以及任何以 SharePoint_ 开头的数据库。
ETL
SQLoogle 大约 75% 是 ETL。当您意识到您的程序主要是 ETL 时,您需要找到一个 ETL 工具。对于 Microsoft 生态系统,您可能会想到 SQL Server Integration Services (SSIS)。对于 Java 生态系统,您可能会想到 Talend。但是,如果您不想引入 SSIS 或 Talend 这样的依赖项,并且宁愿不使用 GUI 工具进行拖放和右键单击来解决 ETL 问题,那么您需要找到一个替代方案。
对于 SQLoogle,我选择了一个名为 Rhino ETL 的开发友好的 ETL 框架。它由 Ayende 用 .NET 开发。它是开源的,并且可以使用 NuGet 轻松地将其包含在您的项目中。
Rhino ETL 对开发人员友好,因为它是代码。您可以在 Visual Studio 中使用面向对象的 C# 组合 ETL 流程。您可以使用任何日志框架和/或单元测试框架。您可以使用 ReSharper 进行重构,ReSharper 通过高亮显示您需要实现的那些方法,使其真正大放异彩。它附带了许多不错的操作(组件),当然,您还可以构建一个可重用库,包含您自己的执行各种惊人操作的操作;毕竟,您拥有整个 .NET 框架来工作。
Rhino ETL 允许我们将操作和/或部分流程注册到一个多线程数据管道中。
- 一个
Operation
是一个单独的提取、转换或加载。一个操作大致相当于基于 GUI 的 ETL 工具中的组件。 - 一个
PartialProcessOperation
由多个操作组成。
两者实现相同的接口,因此它们在管道中是可互换的。SQLoogle 使用和/或实现许多 Rhino ETL 的操作,包括:
InputCommandOperation
AbstractOperation
JoinOperation
,包括左连接和全外连接。AbstractAggregationOperation
SQLoogle 流程
这是从控制台应用程序执行的主要 SQLoogle 进程。
public class SqloogleProcess : SqloogleEtlProcess {
protected override void Initialize() {
var config = (SqloogleBotConfiguration)ConfigurationManager.GetSection("sqloogleBot");
if (!Directory.Exists(config.SearchIndexPath))
Directory.CreateDirectory(config.SearchIndexPath);
Register(new ParallelUnionAllOperation(config.ServerCrawlProcesses()));
Register(new SqloogleAggregate());
Register(new SqloogleTransform());
Register(new SqloogleCompare().Right(new LuceneExtract(config.SearchIndexPath)));
RegisterLast(new LuceneLoad(config.SearchIndexPath));
}
}
SqloogleProcess
实现 SqloogleEtlProcess
,该类实现 Rhino ETL 的主要 EtlProcess 并增加了一些日志记录。第 10 行会为配置文件中定义的每个服务器注册一个 ServerCrawlProcess
。ParallelUnionAllOperation
并行执行每个服务器进程。
Rhino ETL 没有提供 ParallelUnionAllOperation
,当时我也不确定如何创建一个。所以,我给 Ayende 发了封邮件问他。因为他知道 SQLoogle 的重要性,第二天他发给我一个指向他博客文章的链接。他把一个原型贴在了他的博客上。他的一个读者在这里发布了另一个。还有人建议可以使用并行 LINQ 来实现,这是一个很好的观点。不用说,非常感谢这些开发人员在我进行并行编程方面提供的帮助。
现在让我们看看 ServerCrawlProcess
都做了什么。
服务器抓取流程
public class ServerCrawlProcess : PartialProcessOperation {
public ServerCrawlProcess(string connectionString, string server) {
var union = new ParallelUnionAllOperation(
new DefinitionProcess(connectionString),
new CachedSqlProcess(connectionString),
new SqlAgentJobExtract(connectionString),
new ReportingServicesProcess(connectionString)
);
Register(union);
RegisterLast(new AppendToRowOperation("server", server));
}
}
ServerCrawlProcess
是一个 PartialProcessOperation
,正如我之前提到的,它由多个操作组成。它从配置中获取连接字符串和服务器名称,并并行地从 SQL Server 的不同部分收集 SQL 对象。第一个进程是 DefinitionProcess
。
定义流程
public class DefinitionProcess : PartialProcessOperation {
public DefinitionProcess(string connectionString) {
Register(new DatabaseExtract(connectionString));
Register(new DatabaseFilter());
Register(new DefinitionExtract());
Register(new CachedObjectStatsJoin().Right(new CachedObjectStatsExtract(connectionString)));
Register(new TableStatsJoin().Right(new TableStatsExtract(connectionString)));
RegisterLast(new IndexStatsJoin().Right(new IndexStatsExtract(connectionString)));
}
}
DefinitionProcess
是另一个 PartialProcessOperation
。它负责从适当的数据库收集 SQL 定义并将使用统计信息添加到其中。让我们看看我们的第一个单操作:DatabaseExtract
。
数据库提取
DatabaseExtract
是一个 InputOperation
。InputOperation
继承自 Rhino ETL 提供的 InputCommandOperation
,它需要实现两个方法:PrepareCommand
和 CreateRowFromReader
方法:
protected override void PrepareCommand(IDbCommand cmd) {
cmd.CommandText = @"/* SQLoogle */
USE master;
SELECT
database_id AS databaseid
,[Name] AS [database]
,Compatibility_Level AS compatibilitylevel
FROM sys.databases WITH (NOLOCK)
WHERE [state] = 0
AND [user_access] = 0
AND [is_in_standby] = 0
AND compatibility_level >= 80
ORDER BY [name] ASC;
";
}
protected override Row CreateRowFromReader(IDataReader reader) {
var row = Row.FromReader(reader);
row["connectionstring"] = GetDatabaseSpecificConnectionString(row);
return row;
}
PrepareCommand
允许您设置数据库 SQL(或存储过程)命令。正如您在 WHERE
子句中看到的,此 SQL 查询避免提取无法访问或不兼容的数据库。CreateRowFromReader
方法提供了一个机会来捕获查询结果,并将它们以 Rhino ETL 的 Row
对象的形式传递出去。Row
就像一个字典,只是它在尝试访问不存在的键时不会抛出错误,并且它提供了一些有用的方法。如果需要查询返回的所有列,它有一个方便的 FromReader
方法。此方法引用 ADO.NET 的 IDataReader
对象。我在这里使用它将 databaseid、database 和 compatibilitylevel 放入 row
变量中。然后,我添加了一个名为“connectionstring”的附加键。connectionstring 是每个数据库的特定于数据库的连接字符串。该连接字符串稍后在管道中由负责提取 SQL 定义的库使用。
数据库筛选
下一个操作是 DatabaseFilter
,它向我们介绍了 AbstractOperation
。Rhino ETL 的 AbstractOperaton
非常有用。Execute
方法为我们提供了一个机会,可以在行通过管道时对其进行转换或筛选。它接受并返回一个 IEnumerable<Row>
。这使得使用 LINQ 或 ForEach
循环(在可用时产生结果)操作行变得非常容易。一旦我们完成了行的操作,它们就会被传递到管道中的下一个操作。
DatabaseFilter
的目的是过滤掉配置文件中定义的skips。以下是 DatabaseFilter
的 Execute
实现:
public override IEnumerable<Row> Execute(IEnumerable<Row> rows) {
return rows.Where(row => !_config.Skips.Match(row["database"].ToString()));
}
在这种情况下,我向 SkipElementCollection
(Skips) 添加了一个 Match
方法,以检查正在通过的每个“数据库”。如果没有匹配的跳过项,数据库就可以通过;否则,就会被跳过!(或者过滤掉,您随意)。
给个建议……我发现很容易将大量转换和/或过滤添加到 AbstractOperation
中。毕竟,您就在那里,可以很快添加。我的代码中会看到这方面的证据。但是,我建议您尽量将每个操作限制为单一职责。长远来看,这将有助于使代码更易于管理 。
定义提取
接下来是 DefinitionExtract
,这是另一个 AbstractOperation
。它适配了负责生成 T-SQL 定义的 Open DB Diff 项目库。这是我使用的第三个(也是最快的一个)SQL 生成库。它使用了在 DatabaseExtract
操作中创建的“connectionstring”。
连接操作
在为每个数据库提取定义后,它们通过一系列三个连接,从 DefinitionProcess
的第 7 行开始。您只看到连接的 Right()
部分,因为左侧默认是管道中的内容。在这种情况下,来自上一个操作的定义在左侧,而使用情况在右侧。
什么是使用情况?使用情况是对对象被使用了多少次以及最后一次使用的时间(自服务器上次重启以来)的估计。三个连接中的每一个都包含不同类型的使用情况。首先是 CachedObjectStatsExtract
,它会提取对象使用情况(例如,存储过程、视图等)。其次是 TableStatsExtract
,它会提取与表相关的已使用的统计信息。最后是 IndexStatsExtract
使用情况。让我们看一个连接操作来说明 Rhino ETL 的连接操作是如何工作的:
缓存对象统计信息连接
public class CachedObjectStatsJoin : JoinOperation {
protected override Row MergeRows(Row leftRow, Row rightRow) {
var row = leftRow.Clone();
row["use"] = rightRow["use"];
row["lastused"] = DateTime.Now;
return row;
}
protected override void SetupJoinConditions() {
LeftJoin.Left("database", "objectid").Right("database", "objectid");
}
}
Rhino ETL 的 JoinOperation
要求您重写两个方法。它们是 SetupJoinConditions
和 MergeRows
。设置连接需要一个连接类型,以及用于连接的键(或字段)。Rhino ETL 支持内连接、左外连接、右外连接和全外连接。第 11 行表示一个左连接,因为我希望定义能够通过,即使我没有其使用统计信息(也就是说,即使右侧没有匹配项)。
在 MergeRows
中,我们需要将两个行合并为一个;一个左行和一个右行。Row
上的另一个有用的方法是 Clone()
方法。它允许我们获取一个行中的所有内容,并添加另一个行的特定信息。在这种情况下,第 4 行克隆了左行(定义)中的所有内容,并添加了右行中的“use”。对于缓存的对象,我将“lastused”设置为 DateTime.Now
。我的想法是,如果它在缓存中(这些统计信息来自这里),那么它基本上就是当前正在使用的。
回顾接下来的两个连接将没有实际意义。它们与 CachedObjectStatsJoin
非常相似。现在我们必须回到 ServerCrawlProcess
查看接下来是什么。
缓存 SQL 流程
DefinitionProcess 只是 ServerCrawlProcess
中四项并行运行的工作之一。CachedSqlProcess
也在运行。
public class CachedSqlProcess : PartialProcessOperation {
public CachedSqlProcess(string connectionString) {
Register(new CachedSqlExtract(connectionString));
Register(new CachedSqlPreTransform());
Register(new CachedSqlAggregate());
Register(new CachedSqlPostTransform());
}
}
这个 PartialProcessOperation
关注的是没有存储定义的查询。即席查询和准备好的查询对 SQLoogle 来说很重要,因为它们代表了对 SQL 对象的依赖。我们将介绍此进程中的前三个操作。
缓存 SQL 提取
CachedSqlExtract
是另一个 InputOperation
。当我第一次检查查询结果时,我惊讶于我看到的重复项。嗯,它们并不完全是重复项。虽然它们看起来像重复项,但它们在参数、空格和/或标点符号方面存在细微差别。为了说明这一点,这里是常见重复缓存计划的一些屏幕截图:
上面有 14 个即席查询,但 SQLoogle 必须将其减少到 1 个查询,使用次数为 14。如果不进行减少,搜索结果将充斥着这些非常相似的即席查询,引入过多的噪音并使结果无用。为了避免这种情况,SQLoogle 必须将不同的参数替换为占位符,并且只存储“SELECT * FROM UserArea WHERE UserArea.UserKey = @Parameter
。”这是在 CachedSqlPreTransform
操作中完成的。
缓存 SQL 预处理
这是一个实现了 Execute
方法的 AbstractOperation
。
public override IEnumerable<Row> Execute(IEnumerable<Row> rows) {
foreach (var row in rows) {
var sql = Trim(row["sqlscript"]);
if (sql.StartsWith("/* SQLoogle */")) continue;
if (!IsOfInterest(sql)) continue;
sql = ReplaceParameters(sql);
row["sqlscript"] = RemoveOptionalDetails(sql);
yield return row;
}
}
第 6 行过滤掉任何以 /* SQLoogle */ 开头的内容,因为以该开头即席查询是 SQLoogle 本身。ReplaceParameters
方法使用编译的正则表达式将文字参数转换为 @Parameter
占位符。我在 Expresso 中花费了大量时间来使这个正则表达式正常工作。我不会直接展示它并努力解释正则表达式语法,而是向您展示单元测试。
[Test]
public void TestReplaceSimpleParameters() {
const string sql = @"
select 0 from t where c = @z;
select 1 from t where c = 1
select 2 from t where c = 15.1
select 3 from t where c = 1.
select 4 from t where c = .15
select 5 from t where c=2
select 6 from t where c > 1
select 7 from t where c < 1
select 8 from t where c <> 1
select 9 from t where c != 1
select 10 from t where c != 1;
select 11 from t where c1 = 1 and c2 = 2;
select 12 from t where c1 = 'dale' and c2 = 3.0
select 13 from t where c = 'Newman'
select 14 from t where c1 = '' and c2 = 3
select 15 from t where c1 = 'stuff' and c2 = '3'
select 17 from t where c like 'something%'
select 18 from t where c not like 'something%'
select 19 from t where c = 'dale''s'
select 20 from t where c1 = 'dale''s' and c2 = 'x'
select 21 from t where c = -1
select 22 from t where c LIKE '%something%'
select 23 from t where ""t"".""c""='0000089158'
select 24 from t1 inner join t2 on (t1.field = t2.field) where t2.field = 'x';
select 25 from t where c = @Parameter
select 26 from t where c = N'Something'
select 27 from t1 inner join t2 on
(""t1"".""field""=""t2"".""field"")
where double_quotes = 'no'
select 28 from t where [c]='brackets!';
select 29 from t where x = 0x261BE3E63BBFD8439B5CE2971D70A5DE;
";
const string expected = @"
select 0 from t where c = @z;
select 1 from t where c = @Parameter
select 2 from t where c = @Parameter
select 3 from t where c = @Parameter
select 4 from t where c = @Parameter
select 5 from t where c= @Parameter
select 6 from t where c > @Parameter
select 7 from t where c < @Parameter
select 8 from t where c <> @Parameter
select 9 from t where c != @Parameter
select 10 from t where c != @Parameter;
select 11 from t where c1 = @Parameter and c2 = @Parameter;
select 12 from t where c1 = @Parameter and c2 = @Parameter
select 13 from t where c = @Parameter
select 14 from t where c1 = @Parameter and c2 = @Parameter
select 15 from t where c1 = @Parameter and c2 = @Parameter
select 17 from t where c like @Parameter
select 18 from t where c not like @Parameter
select 19 from t where c = @Parameter
select 20 from t where c1 = @Parameter and c2 = @Parameter
select 21 from t where c = @Parameter
select 22 from t where c LIKE @Parameter
select 23 from t where ""t"".""c""= @Parameter
select 24 from t1 inner join t2 on (t1.field = t2.field) where t2.field = @Parameter;
select 25 from t where c = @Parameter
select 26 from t where c = @Parameter
select 27 from t1 inner join t2 on
(""t1"".""field""=""t2"".""field"")
where double_quotes = @Parameter
select 28 from t where [c]= @Parameter;
select 29 from t where x = @Parameter;
";
var actual = CachedSqlPreTransform.ReplaceParameters(sql);
Assert.AreEqual(expected, actual);
}
sql
变量试图模拟可能来自缓存查询计划的各种运算符和文字参数组合。expected
变量是运行 ReplaceParameters
方法后 SQL 应有的样子。此单元测试减轻了我对重构正则表达式的恐惧。这也是一个在我不断重新运行测试以确保我没有弄乱它之前,我几乎无法完成的情况!
缓存 SQL 聚合
在 CachedSqlPreTransform
将许多相似的查询转换为完全重复项后,CachedSqlAggregate
用于消除(或分组)重复的查询。此操作继承自 AbstractAggregationOperation
。因此,它将向我们介绍 Rhino ETL 的聚合功能。
此类需要实现两个方法:GetColumnsToGroupBy
和 Accumulate
。让我们来看一下。
public class CachedSqlAggregate : AbstractAggregationOperation {
protected override string[] GetColumnsToGroupBy() {
return new[] { "sqlscript" };
}
protected override void Accumulate(Row row, Row aggregate) {
// init
if (aggregate["sqlscript"] == null)
aggregate["sqlscript"] = row["sqlscript"];
if (aggregate["type"] == null)
aggregate["type"] = row["type"];
if (aggregate["database"] == null) {
aggregate["database"] = new Object[0];
}
if (aggregate["use"] == null) {
aggregate["use"] = 0;
}
//aggregate
if (row["database"] != null) {
var existing = new List<Object>((Object[])aggregate["database"]);
if (!existing.Contains(row["database"])) {
existing.Add(row["database"]);
aggregate["database"] = existing.ToArray();
}
}
aggregate["use"] = ((int)aggregate["use"]) + ((int)row["use"]);
}
}
我在 GetColumnsToGroupBy
方法中将“sqlscript”指定为分组字段。Rhino ETL 会处理分组,但您需要告诉它如何在 Accumulate
方法中聚合数据。在 Accumulate
方法中,您实际上只需要做两件事;初始化,然后聚合。
初始化发生在第一行进入操作时。聚合的 Row
中没有任何键,所以您应该将其初始化为传入行的值,或默认值。初始化还需要包括分组列,所以不要忘记。
之后,当您确定 aggregate
变量具有键和初始值后,就可以决定如何聚合了。上面,真正需要聚合的唯一字段是database 和 use。我将数据库存储在数组中,并将 use 相加(或求和),这就完成了。它将上面我举例的(14 条相似的)缓存 SQL 记录减少为 1 条记录,使用次数为 14。
AbstractAggregationOperation
只是 Rhino ETL 中聚合的一种方式。如果您愿意,也可以在 AbstractOperation
中使用 LINQ group by 运算符。只要时刻记住,整个 .NET 框架都可以供您使用。
这就是我们从 ServerCrawlProcess
需要了解的全部内容。现在是时候跳回最顶层,即 SqloogleProcess
。我将跳过 SqloogleAggregate
,因为如果您看过一个聚合,您就都看过了,不是吗?
SQLoogle 转换
SqloogleTransform
操作紧随其后。它是一个 AbstractOperation
,用于准备数据以供 Lucene 使用。以下是 Execute
方法:
public override IEnumerable<Row> Execute(IEnumerable<Row> rows) {
foreach (var row in rows) {
row["id"] = row["sqlscript"].GetHashCode().ToString(
CultureInfo.InvariantCulture).Replace("-","X");
row["sql"] = row["sqlscript"] + " " + SqlTransform(row["sqlscript"]);
row["created"] = DateTransform(row["created"], DateTime.Today);
row["modified"] = DateTransform(row["modified"], DateTime.Today);
row["lastused"] = DateTransform(row["lastused"], DateTime.MinValue);
row["server"] = ListTransform(row["server"]);
row["database"] = ListTransform(row["database"]);
row["schema"] = ListTransform(row["schema"]);
row["name"] = ListTransform(row["name"]);
row["use"] = Strings.UseBucket(row["use"]);
row["count"] = row["count"].ToString().PadLeft(10,'0');
row["dropped"] = false;
yield return row;
}
}
id 字段对于每个 Lucene 文档都应该是唯一的。我依赖 GetHashCode
方法。我用“X”替换“-”,因为“-”字符是 Lucene 查询语法中的禁止操作符。
sql 字段包含 sqlscript 字段中的所有内容,以及其他转换后的 SQL。SqlTransform
方法返回一个由标题大小写分隔的独立单词字符串。按标题大小写分割有助于 Lucene 在搜索“Get”和/或“Order”时找到“GetOrder”。
created、modified 和 lastused 字段都是日期。为了支持 Lucene 中的范围查询和排序,我将日期转换为 YYYYMMDD 格式。DateTransform
的第二个参数是默认日期,以防该字段为 null。
server、database、schema 和 name 字段是数组。我将它们连接成一个分隔字符串。
use 和 count 字段是数字。与日期一样,它们用零进行左填充以支持范围查询和排序。此外,UseBucket
方法将除最高有效数字之外的所有数字替换为零。这样做是为了减少对 Lucene 索引的更新次数。毕竟,我只关心使用量的概念,而不是确切的数字。
dropped 字段是一个布尔值。显然,ServerCrawlProcess
找到的任何内容都没有被删除,所以它被设置为 false。
SQLoogle 比较
现在,抓取的数据已准备好加载到 Lucene 中,它需要通过 SqloogleCompare
来确定如何处理。这是一个 JoinOperation
。它是一个全外连接,将抓取的数据与 Lucene 索引中已有的数据进行比较。它将确定对连接两侧的每一行要执行的操作。以下是 MergeRows()
方法的实现:
protected override Row MergeRows(Row newRow, Row oldRow) {
Row row;
// if the old row doesn't exist, then newRow is new, and it should be created
if (oldRow["id"] == null) {
row = newRow.Clone();
row["action"] = "Create";
return row;
}
// if the new row doesn't exist, then oldRow has been
// dropped, and it should be marked as dropped and updated
if (newRow["id"] == null) {
row = oldRow.Clone();
row["dropped"] = true;
row["action"] = "Update";
return row;
}
// if new and old rows are the same, then we have nothing to do.
if (Equal(newRow, oldRow)) {
row = oldRow.Clone();
row["action"] = "None";
return row;
}
// if we end up here, the sql is the same but other properties have been updated, so we should update it.
row = newRow.Clone();
row["action"] = "Update";
row["modified"] = _today;
return row;
}
希望 MergeRows
方法中的注释有所帮助。我已经将 MergeRow
参数从 leftRow
和 rightRow
重命名为 newRow
和 oldRow
。newRow
代表 SQLoogle 刚刚抓取并准备好的 SQL。oldRow
代表已在 SQLoogle 的 Lucene 索引中的 SQL。
在第 6 行,如果 Lucene 索引中的旧行不存在,那么 SQLoogle 在新行中找到了新的 SQL 定义。因此,newRow
的操作被设置为“Create”,并返回。
如果在第 13 行情况相反;也就是说,存在旧行但没有匹配的新行,那么旧行的 SQL 定义已被删除(或暂时丢失、离线等)。我可以从索引中删除它,但我选择将其 dropped 字段更新为 true 并将其操作设置为“Update”。默认情况下,它将在搜索结果中被过滤掉。但是,如果将 dropped:true
添加到 Lucene 查询中,它可能会包含在结果中。当某项内容被意外删除时,这有时会派上用场。
第 21 行仅在旧行和新行都存在时运行。如果存在,Equal()
方法会检查其他字段的相等性。如果它们都相同,则操作为“None”,SQL 对象可以安然无恙地放在 Lucene 索引中;未被修改。
第 28 行开始了一个万能情况。任何能走到这一步的内容意味着 SQL 相同,但 1+ 个属性已更改,因此它是一个“Update”。为了记录更新,我将“modified”字段设置为今天。
现在每个记录都应该准备好进入 Lucene,并且具有 Create、Update 或 None 的操作键。此操作将在 LuceneLoad
中使用,这是我们将要介绍的最后一个 ETL 操作。
Lucene 加载
最后,管道中的数据已准备好用于 Lucene。LuceneLoad
继承自 AbstractLuceneLoad
。我们将从具体类开始,然后进入抽象类。
public class LuceneLoad : AbstractLuceneLoad {
public LuceneLoad(string folder) : base(folder) {}
public override void PrepareSchema() {
Schema["id"] = new LuceneFieldSettings(Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS);
Schema["use"] = new LuceneFieldSettings(Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS);
Schema["count"] = new LuceneFieldSettings(Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS);
Schema["score"] = new LuceneFieldSettings(Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS);
Schema["created"] = new LuceneFieldSettings(Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS);
Schema["modified"] = new LuceneFieldSettings(Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS);
Schema["lastused"] = new LuceneFieldSettings(Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS);
Schema["lastneeded"] = new LuceneFieldSettings(Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS);
Schema["sql"] = new LuceneFieldSettings(Field.Store.NO, Field.Index.ANALYZED);
Schema["sqlscript"] = new LuceneFieldSettings(Field.Store.YES, Field.Index.NO);
}
}
构造函数需要一个 folder
。这将是存储 Lucene.NET 索引的文件夹。PrepareSchema
方法为我们提供了一个机会来设置特定字段的 Lucene 存储和索引选项。如果我们不为字段指定 LuceneFieldSettings
,则使用默认设置。
您可以选择存储和/或索引 Lucene Document
中的每个字段。为了在搜索结果中获得某些内容,您必须存储它(Field.Store.YES
)。为了能够搜索一个字段,您必须对其进行索引(使用 Field.Index.NO
以外的其他值)。我在 PrepareSchema 中做了三件事:
- 存储可预测的字段,并使用最少的索引策略(即
Field.Index.NOT_ANALYZED_NO_NORMS
)。这类字段的示例是我的日期;它们格式始终相同。它们不是真正的全文,所以不需要特殊的分析。 - 不存储转换后的“sql”字段,而是使用最大索引策略(即
Field.Index.ANALYZED
)。转换后的 SQL 用于增强搜索结果,因此需要全文分析。但是,它永远不会在搜索结果中返回,因此没有必要存储它(以免浪费空间)。 - 存储“sqlscript”字段(精确的 SQL 脚本),但完全不索引它(即
Index.NO
)。这将在搜索结果中返回,因为 SQLoogle 需要提供有效的 SQL 脚本。但是,我不需要对其进行索引,因为转换后的 SQL 包含有效的 SQL 脚本,并且更多。
抽象 Lucene 加载
public abstract class AbstractLuceneLoad : AbstractOperation {
private readonly FSDirectory _indexDirectory;
private readonly StandardAnalyzer _standardAnalyzer;
private readonly IndexWriter _indexWriter;
private readonly Dictionary<string,> _counters = new Dictionary<string,>();
public Dictionary<string,> Schema { get; set; }
protected AbstractLuceneLoad(string folder, bool clean = false) {
_indexDirectory = FSDirectory.Open(new DirectoryInfo(folder));
_standardAnalyzer = new StandardAnalyzer(Version.LUCENE_30);
_indexWriter = new IndexWriter(_indexDirectory,
_standardAnalyzer, IndexWriter.MaxFieldLength.UNLIMITED);
_counters.Add("None", 0);
_counters.Add("Create", 0);
_counters.Add("Update", 0);
Schema = new Dictionary<string,>();
}
public abstract void PrepareSchema();
public override IEnumerable<row> Execute(IEnumerable<row> rows) {
PrepareSchema();
foreach (var row in rows) {
if (row["action"] == null) {
throw new InvalidOperationException("There is no action column." +
" A valid action is None, Create, or Update!");
}
var action = row["action"].ToString();
row.Remove("action");
_counters[action] += 1;
switch (action) {
case "None":
continue;
case "Create":
_indexWriter.AddDocument(RowToDoc(row));
continue;
case "Delete":
_indexWriter.DeleteDocuments(new Term("id", row["id"].ToString()));
continue;
case "Update":
_indexWriter.DeleteDocuments(new Term("id", row["id"].ToString()));
_indexWriter.AddDocument(RowToDoc(row));
continue;
}
}
yield break;
}
private Document RowToDoc(Row row) {
var doc = new Document();
foreach (var column in row.Columns) {
if (Schema.ContainsKey(column)) {
doc.Add(new Field(column.ToLower(), row[column].ToString(),
Schema[column].Store, Schema[column].Index));
} else {
doc.Add(new Field(column.ToLower(), row[column].ToString(),
Field.Store.YES, Field.Index.ANALYZED));
}
}
return doc;
}
public sealed override void Dispose() {
Info("Lucene Create: {0}, Update: {1}, and None: {2}.", _counters["Create"],
_counters["Update"], _counters["None"]);
Info("Lucene Optimizing.");
_indexWriter.Optimize();
Info("Lucene Committing.");
_indexWriter.Commit();
_indexWriter.Dispose();
_indexDirectory.Dispose();
_standardAnalyzer.Close();
_standardAnalyzer.Dispose();
base.Dispose();
}
}
AbstractLuceneLoad
继承自 AbstractOperation
。因此,我们必须像往常一样实现 Execute()
方法。此外,我们需要一个设置和拆卸 Lucene.NET 管道的地方。我使用构造函数进行设置,并使用 Dispose()
方法进行拆卸。
构造函数准备一个索引写入器。写入器需要一个目录和一个分析器。一旦有了它们,它就可以操作索引了。通过操作,我的意思是它可以添加文档、删除文档,甚至优化索引。它不更新文档,因此更新是通过删除然后再次添加更新后的文档来执行的。
要使任何操作最终生效,您必须调用 Commit()
方法。它有点像数据库事务;您必须 Commit()
或 Rollback()
您的更改。
Execute
调用我们在 LuceneLoad
中定义的 PrepareSchema()
方法。这将设置我们的存储和索引策略。现在我们准备处理行了。首先,我们必须检查“action”字段是否存在。如果不存在,则意味着有问题,所以我抛出一个异常,因为我想知道它。然后,因为我不想在 Lucene 索引中存储“action”字段,所以我使用方便的 Remove()
方法将其从 Row
实例中删除。
switch
语句负责在 Lucene 索引上执行操作。正如您所见,Lucene API 非常直接,有一个用于添加的 AddDocument()
方法,还有一个用于删除的 DeleteDocuments()
方法。将 Row
实例转换为 Lucene Document
实例很容易。RowToDoc
方法只需循环遍历行中的列并将它们添加到文档中。这是 Schema
可以覆盖默认存储和索引选择的地方。
Dispose()
方法清理所有内容。Rhino ETL 将为我们调用它。我添加了各种漂亮的日志记录,以便了解发生了什么。
总结
此时,您已经看到了 SQLoogle 如何从多个服务器抓取和索引 T-SQL。在大多数情况下,这都是使用 Rhino ETL 的练习。最终结果是一个已准备好进行搜索的 Lucene 索引。由于 Lucene.NET 索引与基于 Java 的 Lucene 索引兼容,如果您想玩转该索引,可以下载 Luke。
下一篇文章将介绍 Web 服务、网站以及由 jQuery 和 Knockout.js 驱动的搜索界面。没有这些部分,您就无法方便地搜索和使用您的所有 SQL。您不必等到下一篇文章,如果您愿意,可以前往 SQLoogle 项目站点查看。
如果您花时间阅读了本文,谢谢 。