PostgreSQL 同步工具。
一个简单易用的数据库同步工具的思路和实现。
引言
本文介绍了一种同步两个 PostgreSQL 数据库的方法。虽然这看起来是一项简单的任务,但在 maps.bremen.de 项目中,没有任何产品(slony、londiste 等)真正满足需求。要么它们有特殊的先决条件,不适用于我们的问题,要么它们不支持同步大型对象。
大型对象用于在 PostgreSQL 中存储街道/航拍地图的瓦片。我的 GIS 服务器查询数据库并获取瓦片。通过使用这种结构,我们可以获得一个灵活的基础设施来更新和维护不同版本的地图。
一切都运行良好,直到需要将服务分布在三台服务器上。我们如何轻松地同步数据库?我真的没有找到一个干净易用的解决方案。
psync
由于我经常使用 rsync,我想为什么没有一个数据库工具,可以简单地*按需*同步两个数据库——没有复杂的触发器或数据库的额外更改。*psync* 是一个简单的应用程序,用于同步两个 PostgreSQL 数据库。该概念可以被改编甚至封装,以便也可以使用其他数据库。
psync 的想法是使用两个数据库连接并按如下方式更新表及其内容
- 获取源数据库和目标数据库中的所有表
- 对于源中的每个表,执行
- 如果目标数据库中不存在表,则创建表
- 同步表的内容
- 对于目标数据库中的每个表,执行
- 如果源数据库中不存在表,则删除表
尽管该算法看起来很简单,但仍有许多问题需要克服。
- 复制表
- 处理特殊列,例如,大型对象的 *oid*
- 表上的约束
- 扩展的特殊结构
复制表
为了复制一个表,我们需要找出存在哪些列以及我们拥有哪些主键等等。有了这些信息,我们可以组装一个 CREATE TABLE
命令来创建表。在 PostgreSQL 中,该信息存储在一个名为 *information_schema.columns* 的表中:column_name、is_nullable 和 data_type。主键稍微复杂一些,因为我们需要两个表 *information_schema.table_constraints* 和 *information_schema.key_column_usage*。有关完整的 SQL 语句,请参阅 PSync.PrimaryKeys
。
大型对象标识符
当前 psync 的实现侧重于使用 oid
数据类型来处理大型对象。大型对象存储二进制数据,并使用整数进行标识。具有 oid
列的表仅存储对大型对象的引用。大多数复制解决方案无法处理大型对象。
当然,我们不能只是将标识符从源复制到目标,我们需要比较数据本身。始终复制整个对象不是一个解决方案,因为它们可能很大,并且当它们相等时会浪费网络带宽和时间。psync 使用的解决方案是*哈希值*(目前使用 MD5 算法)。为了保持低流量,这些哈希由数据库使用自定义函数计算,而无需传输任何数据。psync 比较这些值并按需复制对象。
用于计算 MD5 哈希值的 PostgreSQL 函数是 (原始来源)
CREATE OR REPLACE FUNCTION md5(id oid)
RETURNS text
as $$
DECLARE
fd integer;
size integer;
hashval text;
INV_READ constant integer := 262144; -- 0x40000 from libpq-fs.h
SEEK_SET constant integer := 0;
SEEK_END constant integer := 2;
BEGIN
IF id is null THEN
RETURN NULL;
END IF;
fd := lo_open(id, INV_READ);
size := lo_lseek(fd, 0, 2);
PERFORM lo_lseek(fd, 0, 0);
hashval := md5(loread(fd, size));
PERFORM lo_close(fd);
RETURN hashval;
END;
$$
language plpgsql stable strict;
comment on FUNCTION md5(id oid) is 'Calculates the md5 sum of a large object.';
由于一个表中可能存在多个 oid
列,因此该过程变得更加复杂。以下代码段显示了构建 SQL 查询以检索哈希值的过程
bool first;
string _select = "select";
string _from = " from";
string _where = " where";
// Compose the select block
first = true;
foreach(int oid in oids) {
if (!first)
_select += ",";
// Get the md5 hash value for the oid-column
_select += string.Format(" md5({0})", result.ColumnName(oid));
first = false;
}
// Data from the table we are currently working on
_from += table;
// The row identified by the primary keys of the table
first = true;
foreach(string key in primaryKeys) {
if (!first)
_where += " and";
_where += string.Format(" {0}='{1}'", key, result[row, result.ColumnIndex(key)]);
first = false;
}
command = _select + _from + _where;
构建 SQL 命令后,将比较结果
Result r1, r2;
try {
r1 = dst.Exec(command);
if (r1.Rows > 0) {
r2 = src.Exec(command);
// Compare large objects
for(int c=0; c<r1.Columns; c++) {
if (r1[0, c] != r2[0, c]) {
// md5 hashes are different
int id = CopyOid(result.GetInt(row, oids[c]));
command = string.Format("update {0} set {1}='{2}'{3}",
table, result.ColumnName(oids[c]), id, _where);
dst.Exec(command).Dispose();
}
}
r1.Dispose();
r2.Dispose();
}
else {
// Row missing on destination
r1.Dispose();
command = string.Format("insert into {0} values(", table);
first = true;
for(int col=0; col<result.Columns; col++) {
if (!first)
command += ",";
if (oids.Contains(col)) {
int id = CopyOid(result.GetInt(row, oid));
command += string.Format(" '{0}'", id);
}
else
command += string.Format(" '{0}'", result[row, col]);
first = false;
}
command += ")";
dst.Exec(command).Dispose();
}
} catch(PostgreSQLException e) {
// Large object id exist on destination but does not reference an object
foreach(int oid in oids) {
int id = CopyOid(result.GetInt(row, oid));
command = string.Format("update {0} set {1}='{2}'{3}",
table, result.ColumnName(oid), id, _where);
dst.Exec(command).Dispose();
}
}
Using the Code
该代码分两部分实现。npq
命名空间实现了 libpq(类 PG
)周围的低级包装器以及更易于使用的高级接口。
库的示例用法
PostgreSQL db = new PostgreSQL("hostaddr='127.0.0.1' port='5432' requiressl='1' " +
"user='XXX' password='XXX' dbname='XXX'");
Console.WriteLine("Server {0} Protocol {1}", db.Version, db.Protocol);
Result result = db.Exec("select * from some_table");
Console.WriteLine("Result {0}", result.Valid);
Console.Write("{0,5} ", "Nr");
for(int f=0; f<result.Columns; f++)
Console.Write("| {0,15}:{1,8}", result.ColumnName(f), result.ColumnType(f));
Console.WriteLine();
for(int n=0; n<result.Rows; n++) {
Console.Write("{0,5} ", n);
for(int f=0; f<result.Columns; f++)
Console.Write("| {0,24}", result[n, f]);
}
// You'll need to explicitly call Dispose to free associated ressources
result.Dispose();
db.Dispose();
大型对象的处理是这样完成的
LargeObject lo = new LargeObject(db);
lo.Open(007);
byte[] tmp = new byte[1024];
int s = 0;
FileStream fs = new FileStream(result[n, 0]+".jpg", FileMode.Create);
while ((s = lo.Read(tmp, 1024)) > 0)
fs.Write(tmp, 0, s);
fs.Close();
lo.Close();
该代码是在 Linux 下使用 Mono 和 MonoDevelop 开发的。它也应该在 Windows 下工作,但未经测试。Linux 用户必须添加以下配置 (npq.dll.config) 才能正确映射库。
<configuration>
<dllmap dll="libpq.dll" target="libpq.so" os="!windows" />
</configuration>
历史
- 2009年8月19日:初始版本。