从 PostgreSQL 数据库实现实时 Web 更新
将 PostgreSQL 逻辑复制功能与 dotNetify 结合,将数据更改实时广播到您的网站
我在学习 PostgreSQL 时,注意到一个名为逻辑复制的有趣功能。文档解释说,逻辑复制是一种
“基于其复制标识(通常是主键)复制数据对象及其更改的方法”。
换句话说,这是一种通过让数据库实时向订阅者节点发布逻辑数据更改来确保数据库副本始终同步的方法。
复制订阅者通常是作为主数据库副本的另一个数据库,但它不限于此,可以是任何东西。如果我们将其变成一个 Web 服务呢?然后该服务可以将这些更改推送到 Web 应用程序的多个客户端,以便它们始终实时同步其数据。
实际上,PostgreSQL 还有一个名为 NOTIFY
的命令,用于生成通知,但我们需要为我们感兴趣的每个表编写数据库触发器,并且 string
有效负载大小限制为 8000 字节。逻辑复制没有此限制,可以监听数据库中的所有表,这使其更具可扩展性。
因此,我编写了一个小型开源库,该库结合了 dotNetify 和 PostgreSQL 逻辑复制。您可以使用它来构建一个 ASP.NET 5 Web 应用程序,该应用程序可以响应用户提交到 PostgreSQL 数据库的任何 insert
、update
和 delete
操作。
使用我命名的 DotNetify.Postgres
库(源代码),您的项目将无需轮询数据库、设置复杂的发布/订阅系统,也无需编写大量代码。接下来的博客将带您构建一个演示应用程序。
PostgreSQL 设置
要启用 PostgreSQL 数据库中的逻辑复制,请找到 postgresql.conf 配置文件,将 wal_level
参数更改为 logical
,并将 max_wal_senders
和 max_replication_slots
都设置为至少 1。更改将在服务重启后生效。
您也可以使用 SQL 命令进行更改
ALTER SYSTEM SET wal_level='logical';
ALTER SYSTEM SET max_wal_senders='10';
ALTER SYSTEM SET max_replication_slots='10';
下一步是创建一个发布(publication)
CREATE PUBLICATION my_pub FOR ALL TABLES;
我们将其设置为发布所有表的数据更改,但您可以选择仅限制特定表。
当 PostgreSQL 发布复制记录(也称为预写日志或 WAL)时,它会使用一种称为复制槽(replication slots)的机制来确保在订阅者接收到记录之前,这些记录不会被删除。
复制槽非常有用,因为它们允许订阅者暂时离线,并在重新连接后继续从上次中断的地方恢复。但是有一个警告:在长时间断开连接的情况下,WAL 记录可能会堆积起来,导致磁盘空间不足并崩溃数据库,因此需要监控这些槽。
这是我们创建复制槽的方法
SELECT * FROM pg_create_logical_replication_slot('my_slot', 'pgoutput');
_pgoutput_
是 PostgreSQL 标准的逻辑解码插件,用于将 WAL 中的更改转换为逻辑复制协议。
为了演示,让我们创建一个简单的表。我们还将创建一个新用户,我们的 Web 服务将使用该用户连接到数据库。
CREATE TABLE IF NOT EXISTS businesses (
business_id serial PRIMARY KEY,
business_name VARCHAR ( 50 ) UNIQUE NOT NULL,
rating integer
);
CREATE USER my_user WITH PASSWORD 'my_pwd';
ALTER ROLE my_user WITH REPLICATION;
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO my_user;
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO my_user;
请注意,我们为用户分配了 REPLICATION 角色。要订阅复制槽,需要拥有此角色。
Postgres 数据库现在已配置为逻辑复制。下一步是创建一个 Web 服务来订阅我们创建的复制槽并将更改推送到网站。
Web 服务设置
附带的源代码是一个 ASP.NET 项目,带有 React/Typescript 前端和 Webpack。安装 npm 包后,您可以从 Visual Studio 或 dotnet CLI 运行该项目。
代码中包含演示表的实体类
[Table("businesses")]
public class Business
{
[Column("business_id")]
[Key]
public long Id { get; set; }
[Column("business_name")]
public string Name { get; set; }
[Column("rating")]
public int Rating { get; set; }
}
有一个简单的 React 客户端,用于显示表的内容和关联的 _dotNetify
_ 视图模型。视图模型通过 **DotNetify.Postgres
** 库提供的 API 订阅表上的 PostgreSQL 数据更改事件,并实时将状态更新推送到客户端。
public class BusinessesVM : BaseVM
{
private IDisposable _subs;
// Real-time list; see: https://dotnetify.net/core/api/crud.
[ItemKey(nameof(Business.Id))]
public List<Business> Businesses { get; set; }
public BusinessesVM(IDbChangeObserver dbChangeObserver)
{
Businesses = new List<Business>();
_subs = dbChangeObserver
.Observe<Business>()
.Subscribe(e =>
{
if (e is DbInsertEvent<Business>)
{
var row = (e as DbInsertEvent<Business>).Row;
this.AddList(nameof(Businesses), row);
}
else if (e is DbUpdateEvent<Business>)
{
var row = (e as DbUpdateEvent<Business>).NewRow;
this.UpdateList(nameof(Businesses), row);
}
else if (e is DbDeleteEvent<Business>)
{
var key = (e as DbDeleteEvent<Business>).Row.Id;
this.RemoveList(nameof(Businesses), key);
}
PushUpdates();
});
}
public override void Dispose() => _subs.Dispose();
}
连接到 PostgreSQL 数据库的连接字符串以及发布和复制槽的名称在服务启动类中配置。
public void ConfigureServices(IServiceCollection services)
{
services.AddSignalR();
services.AddDotNetify();
services.AddDotNetifyPostgres(new PostgresConfiguration
{
ConnectionString = Configuration.GetConnectionString("Postgres"),
PublicationName = "my_pub",
ReplicationSlotName = "my_slot"
});
}
仅此而已!这是应用程序响应数据更改的方式。
通过 EF Core 实现完整的 CRUD 操作
到目前为止,我们已经有一个能够响应数据库数据更改的网页。让我们更进一步,使其也能够执行 CRUD 操作。为此,我们将使用 EF Core 和 Npgsql 库。
让我们为演示表添加一个 DbContext
类。
public class BusinessDbContext : DbContext
{
public DbSet<Business> Businesses { get; set; }
public BusinessDbContext(DbContextOptions<BusinessDbContext> options) : base(options) {}
}
出于长期生命周期的考虑,DotNetify 视图模型必须使用工厂来创建新的 DbContext
。因此,我们在启动类中配置 DbContextFactory
服务。
public void ConfigureServices(IServiceCollection services)
{
...
services.AddDbContextFactory<BusinessDbContext>(options =>
options.UseNpgsql(Configuration.GetConnectionString("Postgres")));
}
最后一步是在视图模型上实现 CRUD 方法。
public class BusinessesVM : BaseVM
{
private readonly IDbContextFactory<BusinessDbContext> _contextFactory;
[ItemKey(nameof(Business.Id))]
public List<Business> Businesses { get; set; }
public BusinessesVM(
IDbContextFactory<BusinessDbContext> dbContextFactory,
IDbChangeObserver dbChangeObserver)
{
_contextFactory = dbContextFactory;
using var dbContext = _contextFactory.CreateDbContext();
Businesses = dbContext.Businesses.OrderBy(x => x.Id).ToList();
this.ObserveList<Business>(nameof(Businesses), dbChangeObserver);
}
public void Add(Business businessInfo)
{
using var dbContext = _contextFactory.CreateDbContext();
dbContext.Businesses.Add(businessInfo);
dbContext.SaveChanges();
}
public void Update(Business businessInfo)
{
using var dbContext = _contextFactory.CreateDbContext();
var business = dbContext.Businesses.Find(businessInfo.Id);
if (business != null)
{
business.Name = businessInfo.Name;
business.Rating = businessInfo.Rating;
dbContext.SaveChanges();
}
}
public void Remove(Business businessInfo)
{
using var dbContext = _contextFactory.CreateDbContext();
var business = dbContext.Businesses.Find(businessInfo.Id);
if (business != null)
{
dbContext.Businesses.Remove(business);
dbContext.SaveChanges();
}
}
}
您可能会注意到,之前用于订阅和处理数据更改事件的代码已简化为调用 ObserveList
扩展方法。
这是调用这些 CRUD 方法的 React 客户端的摘录。
export const Businesses = () => {
const { vm, state } = useConnect<State>("BusinessesVM", this);
const [newName, setNewName] = useState<string>("");
const addBusiness = (name: string) => {
vm.$dispatch({ Add: new Business(0, name) });
setNewName("");
};
const updateBusiness = (id: number, name: string, rating: number) => {
vm.$dispatch({ Update: new Business(id, name, rating) });
};
const removeBusiness = (id: number) => {
vm.$dispatch({ Remove: new Business(id) });
};
...
这样,您可以看到演示中两个浏览器实例同步运行,同时我们进行了一些 CRUD 操作。
希望这对您有所帮助!我很想听听您的想法。请回复我的 推文 告诉我。并欢迎转发!