65.9K
CodeProject 正在变化。 阅读更多。
Home

SQLoogle - 第 1 部分,共 2 部分

starIconstarIconstarIconstarIconstarIcon

5.00/5 (22投票s)

2013年5月16日

CPOL

19分钟阅读

viewsIcon

42590

用 Google 搜索您的 SQL。

目录

  1. 引言
  2. 配置
  3. SQLoogle 流程
  4. 总结

引言

有一天,我突然想到,我应该能够在我公司全部 SQL 目录中进行毫秒级的搜索。我需要一个 SQL 版的 Google。类似这样的东西。

SQLoogle 抓取、索引并搜索您的 T-SQL 对象。以下是它的一些功能:

  • 以闪电般的速度抓取几乎所有的 SQL 数据库对象(例如表、索引、存储过程等)。
  • 抓取 SQL Agent 作业步骤。
  • 抓取 Reporting Services 命令。
  • 抓取即席查询(已过滤和压缩)。
  • 抓取准备好的语句。
  • 捕获使用统计信息(执行次数、查找次数、扫描次数和最后使用日期)。
  • 使用 Lucene.NET 执行多字段搜索。
  • 提供基于 Web 的单页 AJAX 搜索界面。
  • 提供 SQL 定义的下载、格式化查看和比较。

如果您和我一样,每天都会查找 SQL 对象。当处理许多 SQL Server 和数据库时,这可能很麻烦。大多数人使用 SQL Server Management Studio 来完成此操作。但是,这意味着大量的点击、滚动、右键单击、腕管综合征等。

导航(或浏览)查找某物有其存在的意义。但是,当需要浏览的内容很多时,搜索功能会非常有帮助。尤其是在您赶时间,或者不确定从哪里开始查找时(对我来说,几乎总是这样)。给几个词,搜索引擎就能立即返回结果。如今,我们对此能力习以为常。因此,SQLoogle 试图提供这种能力。

这是解释 SQLoogle 如何工作的系列文章的第一篇。本文介绍了后端。后端主要是提取、转换和加载(ETL)。第二部分介绍了前端,包括使用 ASP.Net MVC 3 开发的 Web 服务和网站,以及由 jQueryKnockout.js 驱动的 AJAX 搜索界面。以下是 SQLoogle 的基本计划:

  • 从 SQL Server 提取 SQL。
  • 转换和聚合 SQL。
  • 将 SQL 加载到 Lucene.NET。
  • 搜索 SQL!

这是我使用 draw.io 制作的图表。

配置

SQLoogle 的配置使用了 .NET 2.0 的配置 API。在 4GuysFromRolla 网站上有一篇关于此的精彩文章。以下是 App.Config 中 sqloogleBot 部分的示例:

<sqloogleBot searchIndexPath="c:\Sqloogle\SearchIndex\"
    <servers>
      <add name="localhost" 
        connectionString="Server=localhost;Database=master;Trusted_Connection=True;" />
    </servers>
    <skips>
      <add name="master"></add>
      <add name="model"></add>
      <add name="tempdb"></add>
      <add name="msdb"></add>
      <add name="SharePoint_" operator="StartsWith"></add>
    </skips>
</sqloogleBot>

有两个集合:serversskips。在 servers 元素中,为要抓取的每个服务器添加名称和连接字符串。我上面只有一个,但您可以根据需要添加任意数量。在 skips 元素中,添加您不关心的任何数据库。也许您不需要这个,但我发现有些数据库对象我并不关心。为了方便起见,skip 元素有一个 operator 属性,支持 equalsstartsWithendsWith。这允许您用一个条目筛选出多个数据库。因此,以上面的示例为例,SQLoogle 抓取本地 SQL Server,并跳过以下数据库:master、model、tempdb、msdb,以及任何以 SharePoint_ 开头的数据库。

ETL

SQLoogle 大约 75% 是 ETL。当您意识到您的程序主要是 ETL 时,您需要找到一个 ETL 工具。对于 Microsoft 生态系统,您可能会想到 SQL Server Integration Services (SSIS)。对于 Java 生态系统,您可能会想到 Talend。但是,如果您不想引入 SSIS 或 Talend 这样的依赖项,并且宁愿不使用 GUI 工具进行拖放和右键单击来解决 ETL 问题,那么您需要找到一个替代方案。

对于 SQLoogle,我选择了一个名为 Rhino ETL开发友好的 ETL 框架。它由 Ayende 用 .NET 开发。它是开源的,并且可以使用 NuGet 轻松地将其包含在您的项目中。

Rhino ETL 对开发人员友好,因为它是代码。您可以在 Visual Studio 中使用面向对象的 C# 组合 ETL 流程。您可以使用任何日志框架和/或单元测试框架。您可以使用 ReSharper 进行重构,ReSharper 通过高亮显示您需要实现的那些方法,使其真正大放异彩。它附带了许多不错的操作(组件),当然,您还可以构建一个可重用库,包含您自己的执行各种惊人操作的操作;毕竟,您拥有整个 .NET 框架来工作。

Rhino ETL 允许我们将操作和/或部分流程注册到一个多线程数据管道中。

  • 一个 Operation 是一个单独的提取、转换或加载。一个操作大致相当于基于 GUI 的 ETL 工具中的组件。
  • 一个 PartialProcessOperation 由多个操作组成。

两者实现相同的接口,因此它们在管道中是可互换的。SQLoogle 使用和/或实现许多 Rhino ETL 的操作,包括:

  • InputCommandOperation
  • AbstractOperation
  • JoinOperation,包括左连接和全外连接。
  • AbstractAggregationOperation

SQLoogle 流程 

这是从控制台应用程序执行的主要 SQLoogle 进程。

public class SqloogleProcess : SqloogleEtlProcess {
 
    protected override void Initialize() {

        var config = (SqloogleBotConfiguration)ConfigurationManager.GetSection("sqloogleBot");

        if (!Directory.Exists(config.SearchIndexPath))
            Directory.CreateDirectory(config.SearchIndexPath);

        Register(new ParallelUnionAllOperation(config.ServerCrawlProcesses()));
        Register(new SqloogleAggregate());
        Register(new SqloogleTransform());
        Register(new SqloogleCompare().Right(new LuceneExtract(config.SearchIndexPath)));
        RegisterLast(new LuceneLoad(config.SearchIndexPath));
    }
}

SqloogleProcess 实现 SqloogleEtlProcess,该类实现 Rhino ETL 的主要 EtlProcess 并增加了一些日志记录。第 10 行会为配置文件中定义的每个服务器注册一个 ServerCrawlProcessParallelUnionAllOperation 并行执行每个服务器进程。

Rhino ETL 没有提供 ParallelUnionAllOperation,当时我也不确定如何创建一个。所以,我给 Ayende 发了封邮件问他。因为他知道 SQLoogle 的重要性,第二天他发给我一个指向他博客文章的链接。他把一个原型贴在了他的博客上。他的一个读者在这里发布了另一个。还有人建议可以使用并行 LINQ 来实现,这是一个很好的观点。不用说,非常感谢这些开发人员在我进行并行编程方面提供的帮助。

现在让我们看看 ServerCrawlProcess 都做了什么。

服务器抓取流程

public class ServerCrawlProcess : PartialProcessOperation {
 
    public ServerCrawlProcess(string connectionString, string server) {

        var union = new ParallelUnionAllOperation(
            new DefinitionProcess(connectionString),
            new CachedSqlProcess(connectionString),
            new SqlAgentJobExtract(connectionString),
            new ReportingServicesProcess(connectionString)
        );

        Register(union);
        RegisterLast(new AppendToRowOperation("server", server));
    }
}

ServerCrawlProcess 是一个 PartialProcessOperation,正如我之前提到的,它由多个操作组成。它从配置中获取连接字符串和服务器名称,并并行地从 SQL Server 的不同部分收集 SQL 对象。第一个进程是 DefinitionProcess

定义流程

public class DefinitionProcess : PartialProcessOperation {
 
    public DefinitionProcess(string connectionString) {
        Register(new DatabaseExtract(connectionString));
        Register(new DatabaseFilter());
        Register(new DefinitionExtract());
        Register(new CachedObjectStatsJoin().Right(new CachedObjectStatsExtract(connectionString)));
        Register(new TableStatsJoin().Right(new TableStatsExtract(connectionString)));
        RegisterLast(new IndexStatsJoin().Right(new IndexStatsExtract(connectionString)));
    }
}

DefinitionProcess 是另一个 PartialProcessOperation。它负责从适当的数据库收集 SQL 定义并将使用统计信息添加到其中。让我们看看我们的第一个单操作:DatabaseExtract

数据库提取

DatabaseExtract 是一个 InputOperationInputOperation 继承自 Rhino ETL 提供的 InputCommandOperation,它需要实现两个方法:PrepareCommandCreateRowFromReader 方法:

protected override void PrepareCommand(IDbCommand cmd) {
    cmd.CommandText = @"/* SQLoogle */
        USE master;

        SELECT
            database_id AS databaseid
            ,[Name] AS [database]
            ,Compatibility_Level AS compatibilitylevel
        FROM sys.databases WITH (NOLOCK)
        WHERE [state] = 0
        AND [user_access] = 0
        AND [is_in_standby] = 0
        AND compatibility_level >= 80
        ORDER BY [name] ASC;
    ";
}

protected override Row CreateRowFromReader(IDataReader reader) {
    var row = Row.FromReader(reader);
    row["connectionstring"] = GetDatabaseSpecificConnectionString(row);
    return row;
}

PrepareCommand 允许您设置数据库 SQL(或存储过程)命令。正如您在 WHERE 子句中看到的,此 SQL 查询避免提取无法访问或不兼容的数据库。CreateRowFromReader 方法提供了一个机会来捕获查询结果,并将它们以 Rhino ETL 的 Row 对象的形式传递出去。Row 就像一个字典,只是它在尝试访问不存在的键时不会抛出错误,并且它提供了一些有用的方法。如果需要查询返回的所有列,它有一个方便的 FromReader 方法。此方法引用 ADO.NET 的 IDataReader 对象。我在这里使用它将 databaseid、database 和 compatibilitylevel 放入 row 变量中。然后,我添加了一个名为“connectionstring”的附加键。connectionstring 是每个数据库的特定于数据库的连接字符串。该连接字符串稍后在管道中由负责提取 SQL 定义的库使用。

数据库筛选

下一个操作是 DatabaseFilter,它向我们介绍了 AbstractOperation。Rhino ETL 的 AbstractOperaton 非常有用。Execute 方法为我们提供了一个机会,可以在行通过管道时对其进行转换或筛选。它接受并返回一个 IEnumerable<Row>。这使得使用 LINQ 或 ForEach 循环(在可用时产生结果)操作行变得非常容易。一旦我们完成了行的操作,它们就会被传递到管道中的下一个操作。

DatabaseFilter 的目的是过滤掉配置文件中定义的skips。以下是 DatabaseFilterExecute 实现:

public override IEnumerable<Row> Execute(IEnumerable<Row> rows) {
    return rows.Where(row => !_config.Skips.Match(row["database"].ToString()));
}

在这种情况下,我向 SkipElementCollection (Skips) 添加了一个 Match 方法,以检查正在通过的每个“数据库”。如果没有匹配的跳过项,数据库就可以通过;否则,就会被跳过!(或者过滤掉,您随意)。

给个建议……我发现很容易将大量转换和/或过滤添加到 AbstractOperation 中。毕竟,您就在那里,可以很快添加。我的代码中会看到这方面的证据。但是,我建议您尽量将每个操作限制为单一职责。长远来看,这将有助于使代码更易于管理 Smile | <img src=。  

定义提取

接下来是 DefinitionExtract,这是另一个 AbstractOperation。它适配了负责生成 T-SQL 定义的 Open DB Diff 项目库。这是我使用的第三个(也是最快的一个)SQL 生成库。它使用了在 DatabaseExtract 操作中创建的“connectionstring”。

连接操作   

在为每个数据库提取定义后,它们通过一系列三个连接,从 DefinitionProcess 的第 7 行开始。您只看到连接的 Right() 部分,因为左侧默认是管道中的内容。在这种情况下,来自上一个操作的定义在左侧,而使用情况在右侧。

什么是使用情况?使用情况是对对象被使用了多少次以及最后一次使用的时间(自服务器上次重启以来)的估计。三个连接中的每一个都包含不同类型的使用情况。首先是 CachedObjectStatsExtract,它会提取对象使用情况(例如,存储过程、视图等)。其次是 TableStatsExtract,它会提取与表相关的已使用的统计信息。最后是 IndexStatsExtract 使用情况。让我们看一个连接操作来说明 Rhino ETL 的连接操作是如何工作的:

缓存对象统计信息连接

public class CachedObjectStatsJoin : JoinOperation {
 
    protected override Row MergeRows(Row leftRow, Row rightRow) {
        var row = leftRow.Clone();
        row["use"] = rightRow["use"];
        row["lastused"] = DateTime.Now;
        return row;
    }

    protected override void SetupJoinConditions() {
        LeftJoin.Left("database", "objectid").Right("database", "objectid");
    }
}

Rhino ETL 的 JoinOperation 要求您重写两个方法。它们是 SetupJoinConditionsMergeRows。设置连接需要一个连接类型,以及用于连接的键(或字段)。Rhino ETL 支持内连接、左外连接、右外连接和全外连接。第 11 行表示一个左连接,因为我希望定义能够通过,即使我没有其使用统计信息(也就是说,即使右侧没有匹配项)。

MergeRows 中,我们需要将两个行合并为一个;一个左行和一个右行。Row 上的另一个有用的方法是 Clone() 方法。它允许我们获取一个行中的所有内容,并添加另一个行的特定信息。在这种情况下,第 4 行克隆了左行(定义)中的所有内容,并添加了右行中的“use”。对于缓存的对象,我将“lastused”设置为 DateTime.Now。我的想法是,如果它在缓存中(这些统计信息来自这里),那么它基本上就是当前正在使用的。

回顾接下来的两个连接将没有实际意义。它们与 CachedObjectStatsJoin 非常相似。现在我们必须回到 ServerCrawlProcess 查看接下来是什么。

缓存 SQL 流程

DefinitionProcess 只是 ServerCrawlProcess 中四项并行运行的工作之一。CachedSqlProcess 也在运行。

public class CachedSqlProcess : PartialProcessOperation {
    public CachedSqlProcess(string connectionString) {
        Register(new CachedSqlExtract(connectionString));
        Register(new CachedSqlPreTransform());
        Register(new CachedSqlAggregate());
        Register(new CachedSqlPostTransform());
    }
}

这个 PartialProcessOperation 关注的是没有存储定义的查询。即席查询和准备好的查询对 SQLoogle 来说很重要,因为它们代表了对 SQL 对象的依赖。我们将介绍此进程中的前三个操作。

缓存 SQL 提取

CachedSqlExtract 是另一个 InputOperation。当我第一次检查查询结果时,我惊讶于我看到的重复项。嗯,它们并不完全是重复项。虽然它们看起来像重复项,但它们在参数、空格和/或标点符号方面存在细微差别。为了说明这一点,这里是常见重复缓存计划的一些屏幕截图:

上面有 14 个即席查询,但 SQLoogle 必须将其减少到 1 个查询,使用次数为 14。如果不进行减少,搜索结果将充斥着这些非常相似的即席查询,引入过多的噪音并使结果无用。为了避免这种情况,SQLoogle 必须将不同的参数替换为占位符,并且只存储“SELECT * FROM UserArea WHERE UserArea.UserKey = @Parameter。”这是在 CachedSqlPreTransform 操作中完成的。

缓存 SQL 预处理

这是一个实现了 Execute 方法的 AbstractOperation

public override IEnumerable<Row> Execute(IEnumerable<Row> rows) {
    foreach (var row in rows) {

        var sql = Trim(row["sqlscript"]);

        if (sql.StartsWith("/* SQLoogle */")) continue;
        if (!IsOfInterest(sql)) continue;

        sql = ReplaceParameters(sql);
        row["sqlscript"] = RemoveOptionalDetails(sql);
        yield return row;
    }
}

第 6 行过滤掉任何以 /* SQLoogle */ 开头的内容,因为以该开头即席查询是 SQLoogle 本身。ReplaceParameters 方法使用编译的正则表达式将文字参数转换为 @Parameter 占位符。我在 Expresso 中花费了大量时间来使这个正则表达式正常工作。我不会直接展示它并努力解释正则表达式语法,而是向您展示单元测试。

[Test]
public void TestReplaceSimpleParameters() {

    const string sql = @"
        select 0 from t where c = @z;
        select 1 from t where c = 1
        select 2 from t where c = 15.1
        select 3 from t where c = 1.
        select 4 from t where c = .15
        select 5 from t where c=2
        select 6 from t where c > 1
        select 7 from t where c < 1
        select 8 from t where c <> 1
        select 9 from t where c != 1
        select 10 from t where c != 1;
        select 11 from t where c1 = 1 and c2 = 2;
        select 12 from t where c1 = 'dale' and c2 = 3.0
        select 13 from t where c = 'Newman'
        select 14 from t where c1 = '' and c2 = 3
        select 15 from t where c1 = 'stuff' and c2 = '3'
        select 17 from t where c like 'something%'
        select 18 from t where c not like 'something%'
        select 19 from t where c = 'dale''s'
        select 20 from t where c1 = 'dale''s' and c2 = 'x'
        select 21 from t where c = -1
        select 22 from t where c LIKE '%something%'
        select 23 from t where ""t"".""c""='0000089158'
        select 24 from t1 inner join t2 on (t1.field = t2.field) where t2.field = 'x';
        select 25 from t where c = @Parameter
        select 26 from t where c = N'Something'
        select 27 from t1 inner join t2 on 
          (""t1"".""field""=""t2"".""field"") 
          where double_quotes = 'no'
        select 28 from t where [c]='brackets!';
        select 29 from t where x = 0x261BE3E63BBFD8439B5CE2971D70A5DE;
    ";

    const string expected = @"
        select 0 from t where c = @z;
        select 1 from t where c = @Parameter
        select 2 from t where c = @Parameter
        select 3 from t where c = @Parameter
        select 4 from t where c = @Parameter
        select 5 from t where c= @Parameter
        select 6 from t where c > @Parameter
        select 7 from t where c < @Parameter
        select 8 from t where c <> @Parameter
        select 9 from t where c != @Parameter
        select 10 from t where c != @Parameter;
        select 11 from t where c1 = @Parameter and c2 = @Parameter;
        select 12 from t where c1 = @Parameter and c2 = @Parameter
        select 13 from t where c = @Parameter
        select 14 from t where c1 = @Parameter and c2 = @Parameter
        select 15 from t where c1 = @Parameter and c2 = @Parameter
        select 17 from t where c like @Parameter
        select 18 from t where c not like @Parameter
        select 19 from t where c = @Parameter
        select 20 from t where c1 = @Parameter and c2 = @Parameter
        select 21 from t where c = @Parameter
        select 22 from t where c LIKE @Parameter
        select 23 from t where ""t"".""c""= @Parameter
        select 24 from t1 inner join t2 on (t1.field = t2.field) where t2.field = @Parameter;
        select 25 from t where c = @Parameter
        select 26 from t where c = @Parameter
        select 27 from t1 inner join t2 on 
           (""t1"".""field""=""t2"".""field"") 
            where double_quotes = @Parameter
        select 28 from t where [c]= @Parameter;
        select 29 from t where x = @Parameter;
    ";

    var actual = CachedSqlPreTransform.ReplaceParameters(sql);
    Assert.AreEqual(expected, actual);
}

sql 变量试图模拟可能来自缓存查询计划的各种运算符和文字参数组合。expected 变量是运行 ReplaceParameters 方法后 SQL 应有的样子。此单元测试减轻了我对重构正则表达式的恐惧。这也是一个在我不断重新运行测试以确保我没有弄乱它之前,我几乎无法完成的情况!

缓存 SQL 聚合

CachedSqlPreTransform 将许多相似的查询转换为完全重复项后,CachedSqlAggregate 用于消除(或分组)重复的查询。此操作继承自 AbstractAggregationOperation。因此,它将向我们介绍 Rhino ETL 的聚合功能。

此类需要实现两个方法:GetColumnsToGroupByAccumulate。让我们来看一下。

public class CachedSqlAggregate : AbstractAggregationOperation {
 
    protected override string[] GetColumnsToGroupBy() {
        return new[] { "sqlscript" };
    }

    protected override void Accumulate(Row row, Row aggregate) {

        // init
        if (aggregate["sqlscript"] == null)
            aggregate["sqlscript"] = row["sqlscript"];

        if (aggregate["type"] == null)
            aggregate["type"] = row["type"];

        if (aggregate["database"] == null) {
            aggregate["database"] = new Object[0];
        }

        if (aggregate["use"] == null) {
            aggregate["use"] = 0;
        }

        //aggregate
        if (row["database"] != null) {
            var existing = new List<Object>((Object[])aggregate["database"]);
            if (!existing.Contains(row["database"])) {
                existing.Add(row["database"]);
                aggregate["database"] = existing.ToArray();
            }
        }

        aggregate["use"] = ((int)aggregate["use"]) + ((int)row["use"]);
    }
}

 
我在 GetColumnsToGroupBy 方法中将“sqlscript”指定为分组字段。Rhino ETL 会处理分组,但您需要告诉它如何在 Accumulate 方法中聚合数据。在 Accumulate 方法中,您实际上只需要做两件事;初始化,然后聚合。

初始化发生在第一行进入操作时。聚合的 Row 中没有任何键,所以您应该将其初始化为传入行的值,或默认值。初始化还需要包括分组列,所以不要忘记。

之后,当您确定 aggregate 变量具有键和初始值后,就可以决定如何聚合了。上面,真正需要聚合的唯一字段是databaseuse。我将数据库存储在数组中,并将 use 相加(或求和),这就完成了。它将上面我举例的(14 条相似的)缓存 SQL 记录减少为 1 条记录,使用次数为 14。

AbstractAggregationOperation 只是 Rhino ETL 中聚合的一种方式。如果您愿意,也可以在 AbstractOperation 中使用 LINQ group by 运算符。只要时刻记住,整个 .NET 框架都可以供您使用。

这就是我们从 ServerCrawlProcess 需要了解的全部内容。现在是时候跳回最顶层,即 SqloogleProcess。我将跳过 SqloogleAggregate,因为如果您看过一个聚合,您就都看过了,不是吗?

SQLoogle 转换

SqloogleTransform 操作紧随其后。它是一个 AbstractOperation,用于准备数据以供 Lucene 使用。以下是 Execute 方法:

public override IEnumerable<Row> Execute(IEnumerable<Row> rows) {
    foreach (var row in rows) {
        row["id"] = row["sqlscript"].GetHashCode().ToString(
          CultureInfo.InvariantCulture).Replace("-","X");
        row["sql"] = row["sqlscript"] + " " + SqlTransform(row["sqlscript"]);
        row["created"] = DateTransform(row["created"], DateTime.Today);
        row["modified"] = DateTransform(row["modified"], DateTime.Today);
        row["lastused"] = DateTransform(row["lastused"], DateTime.MinValue);
        row["server"] = ListTransform(row["server"]);
        row["database"] = ListTransform(row["database"]);
        row["schema"] = ListTransform(row["schema"]);
        row["name"] = ListTransform(row["name"]);
        row["use"] = Strings.UseBucket(row["use"]);
        row["count"] = row["count"].ToString().PadLeft(10,'0');
        row["dropped"] = false;
        yield return row;
    }
}

id 字段对于每个 Lucene 文档都应该是唯一的。我依赖 GetHashCode 方法。我用“X”替换“-”,因为“-”字符是 Lucene 查询语法中的禁止操作符。

sql 字段包含 sqlscript 字段中的所有内容,以及其他转换后的 SQL。SqlTransform 方法返回一个由标题大小写分隔的独立单词字符串。按标题大小写分割有助于 Lucene 在搜索“Get”和/或“Order”时找到“GetOrder”。

createdmodifiedlastused 字段都是日期。为了支持 Lucene 中的范围查询和排序,我将日期转换为 YYYYMMDD 格式。DateTransform 的第二个参数是默认日期,以防该字段为 null。

serverdatabaseschemaname 字段是数组。我将它们连接成一个分隔字符串。

usecount 字段是数字。与日期一样,它们用零进行左填充以支持范围查询和排序。此外,UseBucket 方法将除最高有效数字之外的所有数字替换为零。这样做是为了减少对 Lucene 索引的更新次数。毕竟,我只关心使用量的概念,而不是确切的数字。

dropped 字段是一个布尔值。显然,ServerCrawlProcess 找到的任何内容都没有被删除,所以它被设置为 false。

SQLoogle 比较

现在,抓取的数据已准备好加载到 Lucene 中,它需要通过 SqloogleCompare 来确定如何处理。这是一个 JoinOperation。它是一个全外连接,将抓取的数据与 Lucene 索引中已有的数据进行比较。它将确定对连接两侧的每一行要执行的操作。以下是 MergeRows() 方法的实现:

protected override Row MergeRows(Row newRow, Row oldRow) {

    Row row;

    // if the old row doesn't exist, then newRow is new, and it should be created
    if (oldRow["id"] == null) {
        row = newRow.Clone();
        row["action"] = "Create";
        return row;
    }

    // if the new row doesn't exist, then oldRow has been
    // dropped, and it should be marked as dropped and updated
    if (newRow["id"] == null) {
        row = oldRow.Clone();
        row["dropped"] = true;
        row["action"] = "Update";
        return row;
    }

    // if new and old rows are the same, then we have nothing to do.
    if (Equal(newRow, oldRow)) {
        row = oldRow.Clone();
        row["action"] = "None";
        return row;
    }

    // if we end up here, the sql is the same but other properties have been updated, so we should update it.
    row = newRow.Clone();
    row["action"] = "Update";
    row["modified"] = _today;
    return row;
}

希望 MergeRows 方法中的注释有所帮助。我已经将 MergeRow 参数从 leftRowrightRow 重命名为 newRowoldRownewRow 代表 SQLoogle 刚刚抓取并准备好的 SQL。oldRow 代表已在 SQLoogle 的 Lucene 索引中的 SQL。

在第 6 行,如果 Lucene 索引中的旧行不存在,那么 SQLoogle 在新行中找到了新的 SQL 定义。因此,newRow 的操作被设置为“Create”,并返回。

如果在第 13 行情况相反;也就是说,存在旧行但没有匹配的新行,那么旧行的 SQL 定义已被删除(或暂时丢失、离线等)。我可以从索引中删除它,但我选择将其 dropped 字段更新为 true 并将其操作设置为“Update”。默认情况下,它将在搜索结果中被过滤掉。但是,如果将 dropped:true 添加到 Lucene 查询中,它可能会包含在结果中。当某项内容被意外删除时,这有时会派上用场。

第 21 行仅在旧行和新行都存在时运行。如果存在,Equal() 方法会检查其他字段的相等性。如果它们都相同,则操作为“None”,SQL 对象可以安然无恙地放在 Lucene 索引中;未被修改。

第 28 行开始了一个万能情况。任何能走到这一步的内容意味着 SQL 相同,但 1+ 个属性已更改,因此它是一个“Update”。为了记录更新,我将“modified”字段设置为今天。

现在每个记录都应该准备好进入 Lucene,并且具有 Create、Update 或 None 的操作键。此操作将在 LuceneLoad 中使用,这是我们将要介绍的最后一个 ETL 操作。

Lucene 加载

最后,管道中的数据已准备好用于 Lucene。LuceneLoad 继承自 AbstractLuceneLoad。我们将从具体类开始,然后进入抽象类。

public class LuceneLoad : AbstractLuceneLoad {

    public LuceneLoad(string folder) : base(folder) {}

    public override void PrepareSchema() {
        Schema["id"] = new LuceneFieldSettings(Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS);
        Schema["use"] = new LuceneFieldSettings(Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS);
        Schema["count"] = new LuceneFieldSettings(Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS);
        Schema["score"] = new LuceneFieldSettings(Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS);
        Schema["created"] = new LuceneFieldSettings(Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS);
        Schema["modified"] = new LuceneFieldSettings(Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS);
        Schema["lastused"] = new LuceneFieldSettings(Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS);
        Schema["lastneeded"] = new LuceneFieldSettings(Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS);
        Schema["sql"] = new LuceneFieldSettings(Field.Store.NO, Field.Index.ANALYZED);
        Schema["sqlscript"] = new LuceneFieldSettings(Field.Store.YES, Field.Index.NO);
    }
}

构造函数需要一个 folder。这将是存储 Lucene.NET 索引的文件夹。PrepareSchema 方法为我们提供了一个机会来设置特定字段的 Lucene 存储和索引选项。如果我们不为字段指定 LuceneFieldSettings,则使用默认设置。

您可以选择存储和/或索引 Lucene Document 中的每个字段。为了在搜索结果中获得某些内容,您必须存储它(Field.Store.YES)。为了能够搜索一个字段,您必须对其进行索引(使用 Field.Index.NO 以外的其他值)。我在 PrepareSchema 中做了三件事:

  • 存储可预测的字段,并使用最少的索引策略(即 Field.Index.NOT_ANALYZED_NO_NORMS)。这类字段的示例是我的日期;它们格式始终相同。它们不是真正的全文,所以不需要特殊的分析。
  • 不存储转换后的“sql”字段,而是使用最大索引策略(即 Field.Index.ANALYZED)。转换后的 SQL 用于增强搜索结果,因此需要全文分析。但是,它永远不会在搜索结果中返回,因此没有必要存储它(以免浪费空间)。
  • 存储“sqlscript”字段(精确的 SQL 脚本),但完全不索引它(即 Index.NO)。这在搜索结果中返回,因为 SQLoogle 需要提供有效的 SQL 脚本。但是,我不需要对其进行索引,因为转换后的 SQL 包含有效的 SQL 脚本,并且更多。
通过这样做,SQLoogle 节省了空间,并保持了搜索索引的快速。现在让我们看一下抽象类。

抽象 Lucene 加载

public abstract class AbstractLuceneLoad : AbstractOperation {

    private readonly FSDirectory _indexDirectory;
    private readonly StandardAnalyzer _standardAnalyzer;
    private readonly IndexWriter _indexWriter;
    private readonly Dictionary<string,> _counters = new Dictionary<string,>();

    public Dictionary<string,> Schema { get; set; }

    protected AbstractLuceneLoad(string folder, bool clean = false) {
        _indexDirectory = FSDirectory.Open(new DirectoryInfo(folder));
        _standardAnalyzer = new StandardAnalyzer(Version.LUCENE_30);
        _indexWriter = new IndexWriter(_indexDirectory, 
          _standardAnalyzer, IndexWriter.MaxFieldLength.UNLIMITED);

        _counters.Add("None", 0);
        _counters.Add("Create", 0);
        _counters.Add("Update", 0);

        Schema = new Dictionary<string,>();
    }

    public abstract void PrepareSchema();

    public override IEnumerable<row> Execute(IEnumerable<row> rows) {

        PrepareSchema();

        foreach (var row in rows) {

            if (row["action"] == null) {
                throw new InvalidOperationException("There is no action column." + 
                 "  A valid action is None, Create, or Update!");
            }

            var action = row["action"].ToString();
            row.Remove("action");

            _counters[action] += 1;

            switch (action) {
                case "None":
                    continue;
                case "Create":
                    _indexWriter.AddDocument(RowToDoc(row));
                    continue;
                case "Delete":
                    _indexWriter.DeleteDocuments(new Term("id", row["id"].ToString()));
                    continue;
                case "Update":
                    _indexWriter.DeleteDocuments(new Term("id", row["id"].ToString()));
                    _indexWriter.AddDocument(RowToDoc(row));
                    continue;
            }
        }
        yield break;
    }

    private Document RowToDoc(Row row) {
        var doc = new Document();
        foreach (var column in row.Columns) {
            if (Schema.ContainsKey(column)) {
                doc.Add(new Field(column.ToLower(), row[column].ToString(), 
                        Schema[column].Store, Schema[column].Index));
            } else {
                doc.Add(new Field(column.ToLower(), row[column].ToString(), 
                        Field.Store.YES, Field.Index.ANALYZED));
            }
        }
        return doc;
    }

    public sealed override void Dispose() {
        Info("Lucene Create: {0}, Update: {1}, and None: {2}.", _counters["Create"], 
                   _counters["Update"], _counters["None"]);

        Info("Lucene Optimizing.");
        _indexWriter.Optimize();

        Info("Lucene Committing.");
        _indexWriter.Commit();

        _indexWriter.Dispose();
        _indexDirectory.Dispose();
        _standardAnalyzer.Close();
        _standardAnalyzer.Dispose();

        base.Dispose();
    }
}

AbstractLuceneLoad 继承自 AbstractOperation。因此,我们必须像往常一样实现 Execute() 方法。此外,我们需要一个设置和拆卸 Lucene.NET 管道的地方。我使用构造函数进行设置,并使用 Dispose() 方法进行拆卸。

构造函数准备一个索引写入器。写入器需要一个目录和一个分析器。一旦有了它们,它就可以操作索引了。通过操作,我的意思是它可以添加文档、删除文档,甚至优化索引。它不更新文档,因此更新是通过删除然后再次添加更新后的文档来执行的。

要使任何操作最终生效,您必须调用 Commit() 方法。它有点像数据库事务;您必须 Commit()Rollback() 您的更改。

Execute 调用我们在 LuceneLoad 中定义的 PrepareSchema() 方法。这将设置我们的存储和索引策略。现在我们准备处理行了。首先,我们必须检查“action”字段是否存在。如果不存在,则意味着有问题,所以我抛出一个异常,因为我想知道它。然后,因为我不想在 Lucene 索引中存储“action”字段,所以我使用方便的 Remove() 方法将其从 Row 实例中删除。

switch 语句负责在 Lucene 索引上执行操作。正如您所见,Lucene API 非常直接,有一个用于添加的 AddDocument() 方法,还有一个用于删除的 DeleteDocuments() 方法。将 Row 实例转换为 Lucene Document 实例很容易。RowToDoc 方法只需循环遍历行中的列并将它们添加到文档中。这是 Schema 可以覆盖默认存储和索引选择的地方。

Dispose() 方法清理所有内容。Rhino ETL 将为我们调用它。我添加了各种漂亮的日志记录,以便了解发生了什么。

总结

此时,您已经看到了 SQLoogle 如何从多个服务器抓取和索引 T-SQL。在大多数情况下,这都是使用 Rhino ETL 的练习。最终结果是一个已准备好进行搜索的 Lucene 索引。由于 Lucene.NET 索引与基于 Java 的 Lucene 索引兼容,如果您想玩转该索引,可以下载 Luke

下一篇文章将介绍 Web 服务、网站以及由 jQuery 和 Knockout.js 驱动的搜索界面。没有这些部分,您就无法方便地搜索和使用您的所有 SQL。您不必等到下一篇文章,如果您愿意,可以前往 SQLoogle 项目站点查看。

如果您花时间阅读了本文,谢谢 Smile | <img src=。 

© . All rights reserved.