65.9K
CodeProject 正在变化。 阅读更多。
Home

DRUM - C++ 实现的 Web 爬虫 URL 见测试

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.80/5 (16投票s)

2009年5月6日

MIT

12分钟阅读

viewsIcon

41556

downloadIcon

523

DRUM(带更新管理的磁盘存储库)的 C++ 实现 - 键/值对存储和异步检查/更新操作

引言和动机 

几周前,我决定构建一个 Web 爬虫。不是一个简单的爬虫,它会忽略当前互联网的规模和问题。毕竟,这些才是这项任务中最有趣的挑战。 

Web 爬虫的一个非常常见的特性是URL 已见测试。顾名思义,它负责将从文档中提取的链接分类为唯一重复。在通常情况下,后者会被丢弃,而前者有资格进行进一步处理,并最终被添加到下载队列中。当然,过滤或连续爬取等因素也可能影响该过程,但在这里与它们无关。

起初,我认为 URL 已见实现的简单直接。我所需要的就是一个好的哈希表和一个好的哈希函数。嗯……我错了。除非你有无限的资源,否则这不是一个高效且可扩展的解决方案。一旦达到数百万甚至数十亿的 URL,就会占用太多内存。请自己计算一下。

那时,我清楚我必须将数据持久化到磁盘。然而,每次检查 URL 都这样做成本太高。因此,我转向了某种异步批量操作。然后,我稍微搜索了一下这个主题,发现了一篇发表在 WWW 会议 2008 上的论文:《IRLbot: 扩展到 60 亿页面及更远》。

这篇论文讨论了用于 URL 已见测试的已知方法,并提出了一种新技术,该技术能达到非常大数据集近乎最佳的理论结果。这种称为DRUM带更新管理的磁盘存储库)的技术,基于 RAM 和磁盘的结合以及存储桶排序策略。

在本文中,尽管我简要概述了 DRUM 的工作原理,但我并不解释其背后的理论。我特别关注实现细节,这在原始论文中并未提及。事实上,即使是 DRUM 的描述也只占其中的一小部分。因此,我分发的代码和我的评论反映了我自己的理解和解释。

我的实现是泛型的,与 STL(标准模板库)的含义相同。只要您使用的数据类型满足某些要求,您就可以使用它。代码应该是可移植的,尽管我只在 Ubuntu/GCC 4.2.4 上运行过它。我使用标准 C++ 和几个开源库:BoostBerkeley DB - 如果您想使用该代码,则需要下载并安装它们。我还应该明确一点,我没有进行任何基准测试或性能评估。我还没有在压力条件下使用过它。(我计划在完成我的爬虫时进行所有这些。)当然,没有保修,但如果您发现任何错误或有建设性的反馈,我非常希望能通知我。

如果您认为文章需要改进,请使用页面底部的留言板。但请记住,我期望您了解 C++(特别是与模板相关的知识)以及算法/数据结构(哈希、树等)。重点在于分发源代码(希望它对其他人有用)并解释设计决策。

理解 DRUM

DRUM 的所有数学模型都已在《IRLbot: 扩展到 60 亿页面及更远》中介绍。但是,没有低级细节可用。而且,从理论到实践的过渡仍然存在一个需要程序员的想象力和适应性来填补的差距。由于本文是我实现 DRUM 的“大图景”,我建议读者参考原论文以获得更长的介绍。

DRUM(带更新管理的磁盘存储库)是一种高效存储键/值对和进行异步检查/更新操作的技术。除了许多 Web 爬虫中存在的 URL 已见测试外,它还可以用于robots.txt 合规性和 DNS 缓存。支持三种操作:

  1. 检查 - 将键与磁盘存储库中的键进行比较,并将其分类为唯一或重复。如果键是重复的,则可以检索其关联值。
  2. 更新 - 将键及其关联值合并到磁盘存储库中。如果键已存在,则只更新其值。
  3. 检查+更新 - 前两者的组合。将键分类为唯一或重复,并执行合并。

对于所有操作,都可能与键/值对一起提供一个辅助数据。这是必需的,因为 DRUM 是异步的。例如,当您获得检查操作的结果时,您可能不再处于请求它时的相同上下文中。

DRUM 可以分为三个主要组件。其中一个是一组数组,它们通过基于键值的存储桶策略进行组织。有一个数组子集负责存储键/值对,另一个子集负责存储辅助数据。通过 DRUM 接口接收的输入被插入到数组中。

一旦任何一个数组被填满,它们的值就会被传输到相应的磁盘存储桶——同样,键/值对和辅助数据被写入不同的存储桶子集。现在为空的数组继续接收输入,并且该过程一直进行,直到其中一个磁盘存储桶达到预定的尺寸阈值。这表明是时候进行合并了。

最后一个组件是一个排序的持久存储库。在合并过程中,键/值存储桶被读入单独的缓冲区并进行排序。其内容与持久存储库的内容同步。此时会发生检查和更新。之后,缓冲区会被重新排序回其原始顺序,以便键/值对再次与相应的辅助存储桶匹配。然后,一个分派机制会将键、值和辅助数据以及操作结果转发以供进一步处理。这个过程对所有存储桶依次重复。

有一点我想非常清楚地说明。数组集和磁盘存储桶集都组织在由键值标识且内部未排序的存储桶中。排序仅在将特定磁盘存储桶读入内存之前进行合并阶段。这使得所有存储桶与(已排序的)持久存储库的同步只通过一次遍历即可高效完成。下图说明了 DRUM 的组件。

drum2.jpg - Click to enlarge image

实现设计

DRUM 接口的主要部分从上面的介绍中显而易见。至少需要三个方法:检查、更新和检查+更新。但它们操作的是什么类型?是的,模板非常强大,是 C++ 的一个重要特性。因此,DRUM 是通过一个类模板实现的。它的名字是Drum

当然,三个模板参数很容易推断:一个用于键类型;另一个用于值类型;第三个用于辅助类型。此外,我还通过使用非类型模板参数强制执行了 DRUM 的其他特性。特别是,我指的是存储桶的数量(记住每个存储桶都有一个相应的数组)、数组中元素的最大数量以及磁盘存储桶的字节大小。还有一个模板参数留待以后介绍。到目前为止,Drum 类模板看起来是这样的:

template <
  class key_t,  
  class value_t, 
  class aux_t,  
  std::size_t num_buckets_nt,
  std::size_t bucket_buff_elem_size_nt,
  std::size_t bucket_byte_size_nt,
  //...
  >
class Drum
{
  //...
};

key_tvalue_taux_t 的模板参数必须满足某些要求。我没有花时间正式指定一个概念供它们建模,但想法很简单。key_t 必须是可小于比较的。(有人可能会问为什么不像 std::map 那样参数化比较。这是因为我看不出在这种实现中有任何不同于 std::less 的行为可以用。)所有三个类型都必须是默认可构造的可赋值的。它们还必须支持序列化和反序列化,以便能够写入磁盘存储桶并从中读取。为此,我期望以下类模板的有效且正确的模板实例化存在:

template <class element_t>
struct ElementIO
{
  static void Serialize(element_t const& element, 
		std::size_t & size, char const* & serial)
  {
    size = sizeof(element_t);
    serial = reinterpret_cast<char const*>(&element);
  }

  static void Deserialize(element_t & element, std::size_t size, char const* serial)
  {
    element = reinterpret_cast<element_t const&>(*serial);
  }
};

如果上述定义不适合您的类型,则需要 ElementIO 的特化。这是 std::string 的一个示例。

template <>
struct ElementIO<std::string>
{
  static void Serialize(std::string const& element, 
		std::size_t & size, char const* & serial)
  {
    size = element.size();
    serial = element.c_str();
  }

  static void Deserialize(std::string & element, std::size_t size, char const* serial)
  {
    element.assign(serial, serial + size);
  }
};

数组和磁盘存储桶由从 0num_buckets_nt - 1 的数字标识。DRUM 需要知道将数据存储到哪个数组,并将数据从数组写入哪个磁盘存储桶。您有责任将键值映射到 ID。实现无法自行完成。鉴于键类型和存储桶数量都是模板参数,存在太多可能性。因此,我使用与序列化/反序列化相同的策略。必须存在 BucketIdentififer 的有效且正确的模板实例化。区别在于这次我只声明了类模板而没有定义它。毕竟,它有什么通用形式呢?

不过,您仍然需要知道 BucketIdentifier 的外观和行为。例如,我展示了类型为 boost::uint64_t 和存储桶数量为 2 的幂的特化。成员函数 Calculate 返回存储桶 ID(也即数组 ID)。

//Only a declaration provided.
template <class key_t, std::size_t num_buckets_nt>
struct BucketIdentififer;

...

//Specialization for boost::uint64_t and two buckets.
template <>
struct BucketIdentififer<boost::uint64_t, 2>
{
  static std::size_t Calculate(boost::uint64_t const& key)
  {
    boost::uint64_t mask = 0x8000000000000000LL;
    boost::uint64_t bucket = mask & key;
    return static_cast<std::size_t>(bucket >> 63);
  }
};

//Specialization for boost::uint64_t and four buckets.
template <>
struct BucketIdentififer<boost::uint64_t, 4>
{
  static std::size_t Calculate(boost::uint64_t const& key)
  {
    boost::uint64_t mask = 0xC000000000000000LL;
    boost::uint64_t bucket = mask & key;
    return static_cast<std::size_t>(bucket >> 62);
  }
};

//Specialization for boost::uint64_t and eight buckets.
template <>
struct BucketIdentififer<boost::uint64_t, 8>
{

  static std::size_t Calculate(boost::uint64_t const& key)
  {
    boost::uint64_t mask = 0xE000000000000000LL;
    boost::uint64_t bucket = mask & key;
    return static_cast<std::size_t>(bucket >> 61);
  }
};

//Specialization for boost::uint64_t and sixteen buckets.
template <>
struct BucketIdentififer<boost::uint64_t, 16>
{
  static std::size_t Calculate(boost::uint64_t const& key)
  {
    boost::uint64_t mask = 0xF000000000000000LL;
    boost::uint64_t bucket = mask & key;
    return static_cast<std::size_t>(bucket >> 60);
  }
};

现在,一个非常重要的观察。持久存储库实现为带有 BTree 访问的 Berkeley DB 数据库。默认情况下,Berkeley DB 使用原始字节字符串作为 BTree 比较函数的键。有时,这不是期望的行为。在 Drum 这样的通用实现中,有必要提供一个与计算存储桶编号的方式一致的比较函数。在合并过程中,从 0num_buckets_nt - 1 的所有存储桶都与 Berkeley DB 数据库中的键的自然迭代顺序匹配。为了高效,这些存储桶的键的迭代应该匹配 Berkeley DB 数据库中键的自然迭代顺序。

编写 BTree 比较函数很容易。您只需要了解一点 Berkeley DB 接口和一个规则:如果 a 大于 b,函数必须返回正数;如果 a 小于 b,则返回负数;如果它们相等,则返回零。例如:

int Compare(Db* db, Dbt const* a, Dbt const* b)
{
  int ai;
  int bi;
  std::memcpy(&ai, a->get_data(), sizeof(int));
  std::memcpy(&bi, b->get_data(), sizeof(int));
  return (ai - bi);
}

请注意,上面的 BucketIdentififer 的特化为较小的键分配较小的存储桶编号,为较大的键分配较大的存储桶编号。例如,如果您使用 4 个存储桶参数化 Drum,编译器将选择 BucketIdentififer<boost::uint64_t, 4> 的特化。在这种情况下,较小的键被放在存储桶 0 中,而较大的键被放在存储桶 3 中(其他键分别放在存储桶 1 和 2 中)。当遍历存储桶并与持久存储库同步时,提交给 Berkeley DB 的查询将具有高度的局部性。这意味着由于 Berkeley DB 为常用键提供内存中的透明缓存,因此访问速度很快, I/O 量不大。

如何配置 DRUM 使用您的比较函数?再次,答案依赖于模板特化。事实上,它与其他情况非常相似。这次您需要提供 BTreeCompare 类模板的特化。这是 boost::uint64_t 的代码。

//Only declaration provided.
template <class key_t>
struct BTreeCompare;

...

//A specialization.
template <>
struct BTreeCompare<boost::uint64_t>
{
  static int Compare(Db* db, Dbt const* a, Dbt const* b)
  {
    boost::uint64_t ai;
    boost::uint64_t bi;
    std::memcpy(&ai, a->get_data(), sizeof(boost::uint64_t));
    std::memcpy(&bi, b->get_data(), sizeof(boost::uint64_t));
    if (ai > bi) return 1;
    if (ai < bi) return -1;
    return 0;
  }
};

是时候回到 Drum 的最后一个模板参数了——从技术上讲,它比您看到的要多。在某些时候,您需要知道操作的结果是什么。Drum 通过一种类似访问者的机制来实现这一点。我称之为 DRUM 分派器。它是一个类型,具有针对检查、更新和检查+更新操作的每种可能结果的成员函数。下面,我展示了一个示例,它打印键、值和辅助值。您可以将其视为Drum Dispatcher概念的原型。

template <class key_t, class value_t, class aux_t>
struct DummyDispatcher
{
  void UniqueKeyCheck(key_t const& k, aux_t const& a) const
  {std::cout << "UniqueKeyCheck: " << k << " - Aux: " << a << "\n";}


  void DuplicateKeyCheck(key_t const& k, value_t const& v, aux_t const& a) const
  {std::cout << "DuplicateKeyCheck: " << k << " - Value: " 
				<< v << " - Aux: " << a << "\n";}

  void UniqueKeyUpdate(key_t const& k, value_t const& v, aux_t const& a) const
  {std::cout << "UniqueKeyUpdate: " << k << " - Value: " 
				<< v << " - Aux: " << a << "\n";}

  void DuplicateKeyUpdate(key_t const& k, value_t const& v, aux_t const& a) const
  {std::cout << "DuplicateKeyUpdate: " << k << " - Value: " 
				<< v << " - Aux: " << a << "\n";}

  void Update(key_t const& k, value_t const& v, aux_t const& a) const
  {std::cout << "Update: " << k << " - Value: " << v << " - Aux: " << a << "\n";}
};

您可能已经注意到 DummyDispatcher 是一个类模板。它依赖于某些类型。因此,Drum 的最后一个模板参数实际上是一个模板模板参数。(如果您刚开始学习 C++,这里的“模板模板”不是错别字。)最后……

template <
  class key_t,
  class value_t,
  class aux_t,
  std::size_t num_buckets_nt,
  std::size_t bucket_buff_elem_size_nt,
  std::size_t bucket_byte_size_nt,
  template <class, class, class> dispatcher_t = NullDispatcher>
class Drum
{
  typedef dispatcher_t<key_t, value_t, aux_t> DispatcherType;
 
  //...

public:
  Drum(std::string const& name);
  Drum(std::string const& name, DispatcherType const& dispatcher);
  //...

  void Check(key_t const& key);
  void Check(key_t const& key, aux_t const& aux);
  void Update(key_t const& key, value_t const& value);
  void Update(key_t const& key, value_t const& value, aux_t const& aux);
  void CheckUpdate(key_t const& key, value_t const& value);
  void CheckUpdate(key_t const& key, value_t const& value, aux_t const& aux);

  //...
};

快速事实

  • Drum 不是线程安全的。在多线程环境中,您应该处理调用周围的并发。
  • 由于 Drum 是一种容器,因此建议 key_tvalueaux_t 易于复制。
  • 内部数组使用 boost::array 实现。
  • 键、值、辅助数据和其他信息存储在 boost::tuples::tuple 中。
  • 磁盘存储桶的文件必须在执行前存在。预先分配它们以获得合适的尺寸以提高性能。
  • 意外行为将导致 DrumExceptionstd::exception 被抛出。Drum 的析构函数不会抛出异常。
  • 所有三个操作都有一个接受 aux_t 数据的重载。
  • 有几个 Drum 构造函数接受特定于 Berkeley DB 的选项作为参数。有关其他自定义设置,请查阅 DB 文档。
  • Drumpublic 接口非常精简。如果您愿意,可以扩展它。
  • 确保您的编译器路径中包含 <boost/array.hpp><boost/tuple/tuple.hpp><boost/cstdint.hpp><db_cxx.h>。这些 Boost 库是头文件库,但 Berkeley DB 需要链接。

实践编码

本节将说明如何使用该代码。首先,定义 Drum 的具体类型。键是 boost::uint64_t,值和辅助类型都是 std::string。它有 2 个键/值存储桶(以及 2 个关联的辅助存储桶),每个数组有 4 个元素,存储桶阈值大小为 64 字节。URLSeenDispatcher 仅将输出打印到屏幕。(请注意,URLSeenDispatcher 继承自 NullDispatcher,后者不提供任何虚函数。因此,您不应依赖基指针或引用来指向它。)实际场景自然需要不同的设置。

值得记住的是,当您完成 Drum 的使用后,可能仍有未完成的元素尚未进行磁盘合并。为了确保一切都已处理,您应该强制同步。然后,我建议调用 Dispose。如果您忘记这样做,Drum 的析构函数会为您完成。但是,如果抛出异常,它将不会传播。

对于 URL 已见测试,键是使用 Broder 描述的 Rabin 方法计算的 URL 指纹。URL 以辅助形式提供,并且实际上不存储与键关联的任何值。仅使用检查+更新操作。

#include "drum.hpp"
#include "bucket_identifier.hpp"
#include "db_compare.hpp"
#include "rabin_fingerprint.hpp"
#include <iostream>

using namespace drum;

template <class key_t, class value_t, class aux_t>
struct URLSeenDispatcher : NullDispatcher<key_t, value_t, aux_t>
{
  void UniqueKeyUpdate(key_t const& k, value_t const& v, aux_t const& a) const
  {std::cout << "UniqueKeyUpdate: " << k << " Data: " << v << " Aux: " << a << "\n";}

  void DuplicateKeyUpdate(key_t const& k, value_t const& v, aux_t const& a) const
  {std::cout << "DuplicateKeyUpdate: " << k << " Data: " << v << " Aux: " << a << "\n";}
};

int main()
{
  try
  {
    std::cout << "Example of Drum usage." << std::endl;

    typedef Drum<boost::uint64_t, std::string, std::string, 
				2, 4, 64, URLSeenDispatcher> DRUM;
    DRUM drum("url-seen.db"); 	//A file with this name is created. 
				//It's the Berkeley DB database.

    RabinFingerprint fp;

    std::string url0 = "https://codeproject.org.cn";
    boost::uint64_t key0 = fp.Compute(url0.c_str(), url0.size());
    drum.CheckUpdate(key0, "", url0);

    std::string url1 = 
	"http://www.oracle.com/technology/products/berkeley-db/index.html";
    boost::uint64_t key1 = fp.Compute(url1.c_str(), url1.size());
    drum.CheckUpdate(key1, "", url1);

    std::string url2 = "https://boost.ac.cn";
    boost::uint64_t key2 = fp.Compute(url2.c_str(), url2.size());
    drum.CheckUpdate(key2, "", url2);

    std::string url3 = "https://codeproject.org.cn";
    boost::uint64_t key3 = fp.Compute(url3.c_str(), url3.size());
    drum.CheckUpdate(key3, "", url3);

    //Synchronize and dispose.
    drum.Synchronize();
    drum.Dispose();

    std::cout << "Done!" << std::endl;
  }
  catch (DrumException & e)
  {
    std::cout << "Drum error: " << e.what() << 
	" Number: " << e.get_error_code() << std::endl;
  }
  catch (std::exception & e)
  {
    std::cout << "Drum error: " << e.what() << std::endl;
  }
  catch (...)
  {
    std::cout << "Something wrong..." << std::endl;
  }
  return 0;
}

最后说明 

如果您仍在阅读,非常感谢。我非常确定该实现存在许多可能的改进之处,它绝对不是最终解决方案。有些事情本可以做得不同,但该实现代表了我可用时间与我认为良好的起点之间的平衡。如果您有任何疑问、建议或更正,请随时与我联系。我非常希望听到您的意见。

© . All rights reserved.