使用 Emitter、SQLite 和 Vue.js 构建的协作待办事项列表
解释如何使用 Emitter 和数据库来存储协作待办事项列表的内容。
- 下载源代码 @ https://github.com/Florimond/emitter-demo-todo
- 查看在线演示 @ https://emitter.io/develop/collaborative-todo
引言
在本文中,我将向您展示如何使用 Emitter 在客户端之间进行通信来创建一个协作待办事项列表。
Emitter 是一个开源、安全且极其易于使用的分布式发布-订阅平台。如果您想先了解 Emitter 的基础知识,可以查看这个 入门 视频,这篇 CodeProject 上的另一篇文章,当然还有官方网站:emitter.io。
技术选型
Vue.js 易于设置、易于使用,并且其占用的空间比 Angular.js 等其他框架要小。这个待办事项列表的界面最初是 Vue.js 官方网站上托管的一个示例。它很棒,但它只将您的待办事项列表存储在浏览器的本地存储中。您的列表无法远程访问。我想能够共享该待办事项列表,并希望人们能够同时阅读和修改它。为此,列表将存储在服务器上的数据库中,而不是存储在浏览器的本地存储中。为了简单起见,我将使用 Node.js 用 JavaScript 编写服务器,并使用 SQLite 作为数据库管理系统。对于不熟悉 SQLite 的人来说,这个数据库管理系统只是将所有数据存储在一个文本文件中。它足够高效,可用于许多专业应用程序,并且其占用的空间使其成为嵌入式系统的绝佳选择。单击 此处 了解 SQLite 何时是良好选择的详细说明。
服务器
初始化
您可以使用这个 npm
命令为 Node 安装 SQLite
npm install sqlite3 --save
就是这样!它已准备好使用。以下两行代码足以打开数据库
var sqlite3 = require("sqlite3");
var db = new sqlite3.Database("todos.db", sqlite3.OPEN_READWRITE | sqlite3.OPEN_CREATE);
如果数据库不存在,刚刚创建了一个新的空 todos.db 文件。此时,我们应该检查数据库是否为空,如果为空则创建我们的架构
db.get("SELECT name FROM sqlite_master WHERE type='table'
AND name='todos'", function(err, data)
{
if (data)
{
startListening();
}
else
{
db.serialize(function()
{
db.run("CREATE TABLE todos (id INTEGER PRIMARY KEY AUTOINCREMENT,
completed BOOLEAN DEFAULT 0, title TEXT, version INTEGER DEFAULT 0)");
db.run("INSERT INTO todos (title) VALUES (\"Buy ketchup\")");
db.run("INSERT INTO todos (title) VALUES (\"Pay internet bill\")");
db.run("INSERT INTO todos (title) VALUES (\"Anna's birthday present\")",
function()
{
startListening();
});
});
}
});
startListening
函数的目的是实例化 Emitter
对象,建立连接,最后在连接就绪时订阅 todo 频道
var emitterKey = "9SN1Xg1DjvmeiSdpnS0WdKkrxlz0koBH";
var channel = "todo";
var emitter = undefined;
function startListening()
{
emitter = require('emitter-io').connect(null, function()
{
console.log('emitter: connected');
emitter.subscribe({
key: emitterKey,
channel: channel + "/cmd",
});
emitter.on('message', function(msg)
{
console.log('emitter: received ' + msg.asString());
msg = msg.asObject();
handle[msg.cmd](msg);
});
});
}
发送到服务器的消息应始终包含一个 cmd
属性 - 一个描述请求目的的动词 - 以及执行请求所需的数据。Emitter 的消息事件处理程序仅解析消息以将其转换为对象(msg = msg.asObject()
),并根据请求中的动词调用正确的处理程序(handle[msg.cmd](msg)
),前提是 handle
是
var handle = {
"getall": handleGetAll,
"add": handleAdd,
"delete": handleDelete,
"removeCompleted": handleRemoveCompleted,
"complete": handleComplete,
"edit": handleEdit
};
现在让我们回顾一下最有趣的处理程序的代码。
使用 SQLite 处理请求
在接下来的代码片段中,我将使用一个辅助函数通过 Emitter 发布消息
function publish(recipient, msg)
{
emitter.publish({
key: emitterKey,
channel: channel + "/" + recipient,
message: JSON.stringify(msg)
});
}
这里没有什么可解释的。它只是调用 publish
函数,传递 emitterKey
,构建频道名称,并将消息字符串化。
getAll 处理程序
当客户端首次显示待办事项列表时,它必须做的第一件事是发送一个 getAll
请求来检索完整的列表
function handleGetAll(msg)
{
db.all("SELECT * FROM todos", function(err, rows){
if (err)
publish(msg.sender, {cmd: "err", err: err, request: msg});
else
publish(msg.sender, {cmd: "getall", todos: rows});
});
}
这是最简单的处理程序。它在 db 上调用 all()
来获取一组行。如果出于任何原因,查询产生错误,此错误将与原始请求一起发送回客户端。否则,它会发布查询结果。
Add 处理程序
function handleAdd(msg)
{
db.run("INSERT INTO todos (title) VALUES (?)", [msg.title], function(err)
{
if (err)
publish(msg.sender, {cmd: "err", err: err, request: msg});
else
publish("broadcast", {cmd: "add", todo:
{id: this.lastID, completed: false, title: msg.title }});
});
}
这里应用了相同的模式。但请注意
- 参数通过占位符
?
注入到查询中,SQLite 会处理传递在参数中的潜在恶意代码。 - 查询结果不仅发送给发出请求的客户端,还通过在“broadcast”子频道中发布结果进行广播,所有客户端都必须订阅该频道。
- 要获得要广播的完整记录,只缺少 id。通过
lastID
变量检索它。
Edit 处理程序
这是我将在这里介绍的最后一个处理程序。其余的处理程序对主题的贡献不大。
function handleEdit(msg)
{
db.get("SELECT version FROM todos WHERE id = ?", [msg.id], function (err, row)
{
if (err)
{
console.log(err);
publish(msg.sender, {cmd: "err", err: err, request: msg});
return;
}
var newVersion = row.version + 1;
db.run("UPDATE todos SET title = ?, version = ? WHERE id = ? AND
version = ?", [msg.title, newVersion, msg.id, row.version], function(err){
if (err)
{
console.log(err);
publish(msg.sender, {cmd: "err", err: err, request: msg});
return;
}
if (this.changes)
publish("broadcast", {cmd: "edit", todo: {id: msg.id,
title: msg.title, version: newVersion}});
});
});
}
需要注意的最重要的一点是,每个记录都有一个版本号,每次更新都必须递增。所以,首先,我们需要检索记录,然后递增其版本号,最后请求更新行。但我们必须执行此更新,不仅按 id 过滤,还按版本号过滤。因为,一方面,所有这些请求都是异步执行的,另一方面,这种读写操作不是原子的。不能保证另一个请求不会在我们执行 select
查询和 update
查询之间更新该行。如果发生这种情况,就没有必要再更新该行了,因为我们正在处理的请求已经过时了。我们通过根据版本号选择行来确保我们不会用过时的数据更新行,这可能会产生无结果,因此处理就到此为止,不会向客户端广播更新消息。
客户端
客户端稍微复杂一些,因为它必须处理几种潜在的冲突更新情况。我将专注于冲突解决,而将与 Vue.js 相关的事项放在一边。
与服务器端类似,下面的代码将使用以下发布辅助函数
function publish(msg)
{
emitter.publish({
key: emitterKey,
channel: channel + "/cmd",
message: JSON.stringify(msg)
});
}
与服务器端不同,这里的所有消息都针对单个接收者,即服务器。服务器正在监听的频道是 cmd
。所有客户端都应该通过此频道发送命令。
请记住,每个请求都有一个 cmd
属性。好吧,命令的响应具有相同的 cmd
属性,并且遵循与服务器端相同的原则进行处理。
var handle = {
"getall": handleGetAll,
"add": handleAdd,
"delete": handleDelete,
"complete": handleComplete,
"edit": handleEdit,
"err": handleError
};
现在,让我们看一下主处理程序
emitter.on('message', function(msg){
console.log('emitter: received ' + msg.asString() );
msg = msg.asObject();
// If this is the init phase, we need to stack any update received
// before the answer to the getall command.
if (app.$data.todos === undefined && msg.cmd != "getall")
app.$data.cmdToApply.push(msg);
else
{
if (!handle[msg.cmd](msg))
app.$data.cmdToApply.push(msg);
}
});
同样,客户端首先要做的是发送一个 getall
请求以检索完整的待办事项列表。但是不能保证此请求的响应将是客户端收到的第一条消息。这就是为什么,如果客户端正在等待 getall
请求的响应(即待办事项列表仍未定义),而它收到的更新消息不是 getall
响应,那么此消息应被存储以备后用。
否则,此更新应立即应用。每个处理程序都应返回一个布尔值,指示消息是否成功应用。如果没有,则此消息应再次存储以备后用。
function handleGetAll(msg)
{
app.$data.todos = msg.todos;
delayedApply();
return true;
}
一旦客户端最终收到 getall
请求的答复,它就必须初始化其 todo
列表,然后应用在此期间收到的所有更新。delayedApply()
函数只是遍历更新数组,并尝试将它们一个接一个地应用于我们待办事项列表的当前状态。
function delayedApply()
{
var remainingCommandsToApply = [];
for (var i = 0; i < app.$data.cmdToApply.length; ++i)
{
var msg = app.$data.cmdToApply[i];
var treated = handle[msg.cmd](msg);
if (!treated)
remainingCommandsToApply.push(msg);
}
app.$data.cmdToApply = remainingCommandsToApply;
}
当然,只有在成功应用更新后,才会将其从 cmdToApply
数组中删除。
接下来要检查的处理程序是 add
处理程序
function handleAdd(msg)
{
console.log("add");
// Let's check whether this todo was already deleted.
if (isBuried(msg.todo.id)) return true;
// Let's check whether, for whatever reason, this todo already was inserted.
for (var i = 0; i < app.$data.todos.length; ++i)
{
var todo = app.$data.todos[i];
if (todo.id == msg.todo.id) return true;
}
// Insert the todo...
app.$data.todos.push(msg.todo);
// ...and apply the stored potential updates related to this todo.
delayedApply();
return true;
}
此函数执行的第一件事是检查此待办事项是否已被“埋葬”。也就是说,这个待办事项是否被之前的更新删除(参见 Tombstone (data store))。确实,另一个客户端可能已经收到了此待办事项的添加更新,然后删除了此待办事项。然后,我们的客户端在这里可能在收到添加更新之前就收到了删除更新!理论上,这是一个我们必须考虑的可能性。
function isBuried(id)
{
return app.$data.cemetery.indexOf(id) == -1 ? false : true;
}
isBuried
函数只是检查待办事项的 id 是否在墓地里。
然后,处理程序检查出于任何原因,这个待办事项是否已经被插入。最后,处理程序将待办事项记录推入列表,并调用 delayedApply()
函数,该函数将确保所有与此新添加的待办事项相关的更新都已应用。
请注意,此处理程序始终返回 true
。待办事项可能刚刚成功添加,或者可能已被埋葬或更早添加。无论如何,客户端都不应再担心此更新消息了。
现在让我们看一下 edit
处理程序
function handleEdit(msg)
{
// Let's check whether this todo was already deleted.
if (isBuried(msg.todo.id)) return true;
for (var i = 0; i < app.$data.todos.length; ++i)
{
var todo = app.$data.todos[i];
if (todo.id == msg.todo.id)
{
if (todo.version >= msg.todo.version) return true;
todo.title = msg.todo.title;
todo.version = msg.todo.version;
return true;
}
}
/*
At this point, the todo item corresponding to the id passed
in the message was not found.
This could be a case of late "add" message.
*/
return false;
}
同样,第一件事是检查待办事项是否已被埋葬。
然后处理程序遍历待办事项列表以查找应该更新的待办事项。当找到此待办事项时,仅当其版本号低于请求中传递的版本号时才更新它。
如果当前列表中未找到该待办事项,并且也没有被埋葬,那么最有可能的是尚未收到请求插入该待办事项的更新消息。因此,处理程序返回 false,这将导致消息被推入稍后应用的更新列表中。
delete
和 complete
处理程序遵循相同的原则,检查墓地,并且仅在客户端的版本低于更新中提出的版本时才应用更新。
结论
这就是处理协作应用程序中潜在冲突的几个技巧。关于 Emitter 本身真的没有什么可写的,这正是因为它非常容易使用。
历史
- 2017年2月4日:初始版本