65.9K
CodeProject 正在变化。 阅读更多。
Home

PostgreSQL 同步工具。

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.11/5 (4投票s)

2009年8月19日

CDDL

3分钟阅读

viewsIcon

43159

downloadIcon

704

一个简单易用的数据库同步工具的思路和实现。

引言

本文介绍了一种同步两个 PostgreSQL 数据库的方法。虽然这看起来是一项简单的任务,但在 maps.bremen.de 项目中,没有任何产品(slony、londiste 等)真正满足需求。要么它们有特殊的先决条件,不适用于我们的问题,要么它们不支持同步大型对象。

大型对象用于在 PostgreSQL 中存储街道/航拍地图的瓦片。我的 GIS 服务器查询数据库并获取瓦片。通过使用这种结构,我们可以获得一个灵活的基础设施来更新和维护不同版本的地图。

一切都运行良好,直到需要将服务分布在三台服务器上。我们如何轻松地同步数据库?我真的没有找到一个干净易用的解决方案。

psync

由于我经常使用 rsync,我想为什么没有一个数据库工具,可以简单地*按需*同步两个数据库——没有复杂的触发器或数据库的额外更改。*psync* 是一个简单的应用程序,用于同步两个 PostgreSQL 数据库。该概念可以被改编甚至封装,以便也可以使用其他数据库。

psync 的想法是使用两个数据库连接并按如下方式更新表及其内容

  1. 获取源数据库和目标数据库中的所有表
  2. 对于源中的每个表,执行
    1. 如果目标数据库中不存在表,则创建表
    2. 同步表的内容
  3. 对于目标数据库中的每个表,执行
    1. 如果源数据库中不存在表,则删除表

尽管该算法看起来很简单,但仍有许多问题需要克服。

  1. 复制表
  2. 处理特殊列,例如,大型对象的 *oid*
  3. 表上的约束
  4. 扩展的特殊结构

复制表

为了复制一个表,我们需要找出存在哪些列以及我们拥有哪些主键等等。有了这些信息,我们可以组装一个 CREATE TABLE 命令来创建表。在 PostgreSQL 中,该信息存储在一个名为 *information_schema.columns* 的表中:column_name、is_nullable 和 data_type。主键稍微复杂一些,因为我们需要两个表 *information_schema.table_constraints* 和 *information_schema.key_column_usage*。有关完整的 SQL 语句,请参阅 PSync.PrimaryKeys

大型对象标识符

当前 psync 的实现侧重于使用 oid 数据类型来处理大型对象。大型对象存储二进制数据,并使用整数进行标识。具有 oid 列的表仅存储对大型对象的引用。大多数复制解决方案无法处理大型对象。

当然,我们不能只是将标识符从源复制到目标,我们需要比较数据本身。始终复制整个对象不是一个解决方案,因为它们可能很大,并且当它们相等时会浪费网络带宽和时间。psync 使用的解决方案是*哈希值*(目前使用 MD5 算法)。为了保持低流量,这些哈希由数据库使用自定义函数计算,而无需传输任何数据。psync 比较这些值并按需复制对象。

用于计算 MD5 哈希值的 PostgreSQL 函数是 (原始来源)

CREATE OR REPLACE FUNCTION md5(id oid)
RETURNS text
as $$
DECLARE
 fd        integer;
 size      integer;
 hashval   text;
 INV_READ  constant integer := 262144; -- 0x40000 from libpq-fs.h
 SEEK_SET  constant integer := 0;
 SEEK_END  constant integer := 2;
BEGIN
 IF id is null THEN
   RETURN NULL;
 END IF;
 fd   := lo_open(id, INV_READ);
 size := lo_lseek(fd, 0, 2);
 PERFORM lo_lseek(fd, 0, 0);
 hashval := md5(loread(fd, size));
 PERFORM lo_close(fd);
 RETURN hashval;
END;
$$
language plpgsql stable strict;
comment on FUNCTION md5(id oid) is 'Calculates the md5 sum of a large object.';

由于一个表中可能存在多个 oid 列,因此该过程变得更加复杂。以下代码段显示了构建 SQL 查询以检索哈希值的过程

bool first;
string _select = "select";
string _from = " from";
string _where = " where";

// Compose the select block
first = true;
foreach(int oid in oids) {
  if (!first)
    _select += ",";
  // Get the md5 hash value for the oid-column
  _select += string.Format(" md5({0})", result.ColumnName(oid));
  first = false;
}

// Data from the table we are currently working on
_from += table;

// The row identified by the primary keys of the table
first = true;
foreach(string key in primaryKeys) {
  if (!first)
    _where += " and";
  _where += string.Format(" {0}='{1}'", key, result[row, result.ColumnIndex(key)]);
  first = false;
}
command = _select + _from + _where;

构建 SQL 命令后,将比较结果

Result r1, r2;
try {
  r1 = dst.Exec(command);
  if (r1.Rows > 0) {
    r2 = src.Exec(command);

    // Compare large objects
    for(int c=0; c<r1.Columns; c++) {
      if (r1[0, c] != r2[0, c]) {
        // md5 hashes are different
        int id = CopyOid(result.GetInt(row, oids[c]));
        command = string.Format("update {0} set {1}='{2}'{3}", 
                                table, result.ColumnName(oids[c]), id, _where);
        dst.Exec(command).Dispose();
      }
    }

    r1.Dispose();
    r2.Dispose();
  }
  else {
    // Row missing on destination
    r1.Dispose();
    command = string.Format("insert into {0} values(", table);
    first = true;
    for(int col=0; col<result.Columns; col++) {
      if (!first)
        command += ",";
      if (oids.Contains(col)) {
        int id = CopyOid(result.GetInt(row, oid));
        command += string.Format(" '{0}'", id);
      }
      else 
        command += string.Format(" '{0}'", result[row, col]);
      first = false;
    }
    command += ")";
    dst.Exec(command).Dispose();
  }
} catch(PostgreSQLException e) {
  // Large object id exist on destination but does not reference an object
  foreach(int oid in oids) {
    int id = CopyOid(result.GetInt(row, oid));
    command = string.Format("update {0} set {1}='{2}'{3}", 
                            table, result.ColumnName(oid), id, _where);
    dst.Exec(command).Dispose();
  }
}

Using the Code

该代码分两部分实现。npq 命名空间实现了 libpq(类 PG)周围的低级包装器以及更易于使用的高级接口。

库的示例用法

PostgreSQL db = new PostgreSQL("hostaddr='127.0.0.1' port='5432' requiressl='1' " + 
                               "user='XXX' password='XXX' dbname='XXX'");
Console.WriteLine("Server {0} Protocol {1}", db.Version, db.Protocol);

Result result = db.Exec("select * from some_table");
Console.WriteLine("Result {0}", result.Valid);
Console.Write("{0,5} ", "Nr");
for(int f=0; f<result.Columns; f++)
  Console.Write("| {0,15}:{1,8}", result.ColumnName(f), result.ColumnType(f));
Console.WriteLine();
for(int n=0; n<result.Rows; n++) {
  Console.Write("{0,5} ", n);
  for(int f=0; f<result.Columns; f++)
    Console.Write("| {0,24}", result[n, f]);
}
// You'll need to explicitly call Dispose to free associated ressources
result.Dispose();
db.Dispose();

大型对象的处理是这样完成的

LargeObject lo = new LargeObject(db);
lo.Open(007);
byte[] tmp = new byte[1024];
int s = 0;
FileStream fs = new FileStream(result[n, 0]+".jpg", FileMode.Create);
while ((s = lo.Read(tmp, 1024)) > 0)
  fs.Write(tmp, 0, s);
fs.Close();
lo.Close();

该代码是在 Linux 下使用 Mono 和 MonoDevelop 开发的。它也应该在 Windows 下工作,但未经测试。Linux 用户必须添加以下配置 (npq.dll.config) 才能正确映射库。

<configuration>
  <dllmap dll="libpq.dll" target="libpq.so" os="!windows" />
</configuration>

历史

  • 2009年8月19日:初始版本。
© . All rights reserved.