65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 Emitter、SQLite 和 Vue.js 构建的协作待办事项列表

starIconstarIconstarIconstarIconstarIcon

5.00/5 (4投票s)

2017 年 2 月 4 日

CPOL

8分钟阅读

viewsIcon

22052

解释如何使用 Emitter 和数据库来存储协作待办事项列表的内容。

引言

在本文中,我将向您展示如何使用 Emitter 在客户端之间进行通信来创建一个协作待办事项列表。

Emitter 是一个开源、安全且极其易于使用的分布式发布-订阅平台。如果您想先了解 Emitter 的基础知识,可以查看这个 入门 视频,这篇 CodeProject 上的另一篇文章,当然还有官方网站:emitter.io

技术选型

Vue.js 易于设置、易于使用,并且其占用的空间比 Angular.js 等其他框架要小。这个待办事项列表的界面最初是 Vue.js 官方网站上托管的一个示例。它很棒,但它只将您的待办事项列表存储在浏览器的本地存储中。您的列表无法远程访问。我想能够共享该待办事项列表,并希望人们能够同时阅读和修改它。为此,列表将存储在服务器上的数据库中,而不是存储在浏览器的本地存储中。为了简单起见,我将使用 Node.js 用 JavaScript 编写服务器,并使用 SQLite 作为数据库管理系统。对于不熟悉 SQLite 的人来说,这个数据库管理系统只是将所有数据存储在一个文本文件中。它足够高效,可用于许多专业应用程序,并且其占用的空间使其成为嵌入式系统的绝佳选择。单击 此处 了解 SQLite 何时是良好选择的详细说明。

服务器

初始化

您可以使用这个 npm 命令为 Node 安装 SQLite

npm install sqlite3 --save 

就是这样!它已准备好使用。以下两行代码足以打开数据库

var sqlite3 = require("sqlite3");
var db = new sqlite3.Database("todos.db", sqlite3.OPEN_READWRITE | sqlite3.OPEN_CREATE);

如果数据库不存在,刚刚创建了一个新的空 todos.db 文件。此时,我们应该检查数据库是否为空,如果为空则创建我们的架构

db.get("SELECT name FROM sqlite_master WHERE type='table' 
        AND name='todos'", function(err, data)
{
    if (data)
    {
        startListening();
    }
    else
    {
        db.serialize(function()
        {
            db.run("CREATE TABLE todos (id INTEGER PRIMARY KEY AUTOINCREMENT, 
            completed BOOLEAN DEFAULT 0, title TEXT, version INTEGER DEFAULT 0)");
            db.run("INSERT INTO todos (title) VALUES (\"Buy ketchup\")");
            db.run("INSERT INTO todos (title) VALUES (\"Pay internet bill\")");
            db.run("INSERT INTO todos (title) VALUES (\"Anna's birthday present\")", 
                    function()
            {
                startListening();
            });
        });
    }
});

startListening 函数的目的是实例化 Emitter 对象,建立连接,最后在连接就绪时订阅 todo 频道

var emitterKey = "9SN1Xg1DjvmeiSdpnS0WdKkrxlz0koBH";
var channel = "todo";
var emitter = undefined;

function startListening()
{
    emitter = require('emitter-io').connect(null, function()
    {
        console.log('emitter: connected');
        emitter.subscribe({
            key: emitterKey,
            channel: channel + "/cmd",
        });
        emitter.on('message', function(msg)
        {
            console.log('emitter: received ' + msg.asString());
            msg = msg.asObject();
            handle[msg.cmd](msg);
        });         
    });      
}

发送到服务器的消息应始终包含一个 cmd 属性 - 一个描述请求目的的动词 - 以及执行请求所需的数据。Emitter 的消息事件处理程序仅解析消息以将其转换为对象(msg = msg.asObject()),并根据请求中的动词调用正确的处理程序(handle[msg.cmd](msg)),前提是 handle

var handle = {
    "getall": handleGetAll,
    "add": handleAdd,
    "delete": handleDelete,
    "removeCompleted": handleRemoveCompleted,
    "complete": handleComplete,
    "edit": handleEdit
}; 

现在让我们回顾一下最有趣的处理程序的代码。

使用 SQLite 处理请求

在接下来的代码片段中,我将使用一个辅助函数通过 Emitter 发布消息

function publish(recipient, msg)
{
    emitter.publish({
        key: emitterKey,
        channel: channel + "/" + recipient,
        message: JSON.stringify(msg)
    });
}

这里没有什么可解释的。它只是调用 publish 函数,传递 emitterKey,构建频道名称,并将消息字符串化。

getAll 处理程序

当客户端首次显示待办事项列表时,它必须做的第一件事是发送一个 getAll 请求来检索完整的列表

function handleGetAll(msg)
{
    db.all("SELECT * FROM todos", function(err, rows){
        if (err)
            publish(msg.sender, {cmd: "err", err: err, request: msg}); 
        else
            publish(msg.sender, {cmd: "getall", todos: rows});
    }); 
}

这是最简单的处理程序。它在 db 上调用 all() 来获取一组行。如果出于任何原因,查询产生错误,此错误将与原始请求一起发送回客户端。否则,它会发布查询结果。

Add 处理程序

function handleAdd(msg)
{
    db.run("INSERT INTO todos (title) VALUES (?)", [msg.title], function(err)
    {
        if (err)
            publish(msg.sender, {cmd: "err", err: err, request: msg}); 
        else    
            publish("broadcast", {cmd: "add", todo: 
            {id: this.lastID, completed: false, title: msg.title }});
    });    
}

这里应用了相同的模式。但请注意

  • 参数通过占位符 ? 注入到查询中,SQLite 会处理传递在参数中的潜在恶意代码。
  • 查询结果不仅发送给发出请求的客户端,还通过在“broadcast”子频道中发布结果进行广播,所有客户端都必须订阅该频道。
  • 要获得要广播的完整记录,只缺少 id。通过 lastID 变量检索它。

Edit 处理程序

这是我将在这里介绍的最后一个处理程序。其余的处理程序对主题的贡献不大。

function handleEdit(msg)
{
    db.get("SELECT version FROM todos WHERE id = ?", [msg.id], function (err, row)
    {
        if (err)
        {
            console.log(err);
            publish(msg.sender, {cmd: "err", err: err, request: msg});
            return;
        }
        var newVersion = row.version + 1;
        db.run("UPDATE todos SET title = ?, version = ? WHERE id = ? AND 
        version = ?", [msg.title, newVersion, msg.id, row.version], function(err){
            if (err)
            {
                console.log(err);
                publish(msg.sender, {cmd: "err", err: err, request: msg});
                return;
            }
            if (this.changes)
                publish("broadcast", {cmd: "edit", todo: {id: msg.id, 
                         title: msg.title, version: newVersion}});
        });
    });
}

需要注意的最重要的一点是,每个记录都有一个版本号,每次更新都必须递增。所以,首先,我们需要检索记录,然后递增其版本号,最后请求更新行。但我们必须执行此更新,不仅按 id 过滤,还按版本号过滤。因为,一方面,所有这些请求都是异步执行的,另一方面,这种读写操作不是原子的。不能保证另一个请求不会在我们执行 select 查询和 update 查询之间更新该行。如果发生这种情况,就没有必要再更新该行了,因为我们正在处理的请求已经过时了。我们通过根据版本号选择行来确保我们不会用过时的数据更新行,这可能会产生无结果,因此处理就到此为止,不会向客户端广播更新消息。

客户端

客户端稍微复杂一些,因为它必须处理几种潜在的冲突更新情况。我将专注于冲突解决,而将与 Vue.js 相关的事项放在一边。

与服务器端类似,下面的代码将使用以下发布辅助函数

function publish(msg)
{
    emitter.publish({
        key: emitterKey,
        channel: channel + "/cmd",
        message: JSON.stringify(msg)
    });
}

与服务器端不同,这里的所有消息都针对单个接收者,即服务器。服务器正在监听的频道是 cmd。所有客户端都应该通过此频道发送命令。

请记住,每个请求都有一个 cmd 属性。好吧,命令的响应具有相同的 cmd 属性,并且遵循与服务器端相同的原则进行处理。

var handle = {
    "getall": handleGetAll,
    "add": handleAdd,
    "delete": handleDelete,
    "complete": handleComplete,
    "edit": handleEdit,
    "err": handleError
};

现在,让我们看一下主处理程序

emitter.on('message', function(msg){
    console.log('emitter: received ' + msg.asString() );
    msg = msg.asObject();
    
    // If this is the init phase, we need to stack any update received 
    // before the answer to the getall command.
    if (app.$data.todos === undefined && msg.cmd != "getall")
        app.$data.cmdToApply.push(msg);
    else
    {
        if (!handle[msg.cmd](msg))
            app.$data.cmdToApply.push(msg);
    }
});

同样,客户端首先要做的是发送一个 getall 请求以检索完整的待办事项列表。但是不能保证此请求的响应将是客户端收到的第一条消息。这就是为什么,如果客户端正在等待 getall 请求的响应(即待办事项列表仍未定义),而它收到的更新消息不是 getall 响应,那么此消息应被存储以备后用。

否则,此更新应立即应用。每个处理程序都应返回一个布尔值,指示消息是否成功应用。如果没有,则此消息应再次存储以备后用。

function handleGetAll(msg)
{
    app.$data.todos = msg.todos;
    delayedApply();
    return true;
}

一旦客户端最终收到 getall 请求的答复,它就必须初始化其 todo 列表,然后应用在此期间收到的所有更新。delayedApply() 函数只是遍历更新数组,并尝试将它们一个接一个地应用于我们待办事项列表的当前状态。

function delayedApply()
{
    var remainingCommandsToApply = [];
    for (var i = 0; i < app.$data.cmdToApply.length; ++i)
    {
        var msg = app.$data.cmdToApply[i];
        var treated = handle[msg.cmd](msg);
        if (!treated)
            remainingCommandsToApply.push(msg);
    }
    app.$data.cmdToApply = remainingCommandsToApply;    
}

当然,只有在成功应用更新后,才会将其从 cmdToApply 数组中删除。

接下来要检查的处理程序是 add 处理程序

function handleAdd(msg)
{
    console.log("add");
    // Let's check whether this todo was already deleted.
    if (isBuried(msg.todo.id)) return true;
        
    // Let's check whether, for whatever reason, this todo already was inserted.
    for (var i = 0; i < app.$data.todos.length; ++i)
    {
        var todo = app.$data.todos[i];
        if (todo.id == msg.todo.id) return true;
    }
    // Insert the todo...
    app.$data.todos.push(msg.todo);
    // ...and apply the stored potential updates related to this todo. 
    delayedApply();
    return true;
}

此函数执行的第一件事是检查此待办事项是否已被“埋葬”。也就是说,这个待办事项是否被之前的更新删除(参见 Tombstone (data store))。确实,另一个客户端可能已经收到了此待办事项的添加更新,然后删除了此待办事项。然后,我们的客户端在这里可能在收到添加更新之前就收到了删除更新!理论上,这是一个我们必须考虑的可能性。

function isBuried(id)
{
    return app.$data.cemetery.indexOf(id) == -1 ? false : true;
}

isBuried 函数只是检查待办事项的 id 是否在墓地里。

然后,处理程序检查出于任何原因,这个待办事项是否已经被插入。最后,处理程序将待办事项记录推入列表,并调用 delayedApply() 函数,该函数将确保所有与此新添加的待办事项相关的更新都已应用。

请注意,此处理程序始终返回 true。待办事项可能刚刚成功添加,或者可能已被埋葬或更早添加。无论如何,客户端都不应再担心此更新消息了。

现在让我们看一下 edit 处理程序

function handleEdit(msg)
{
    // Let's check whether this todo was already deleted.
    if (isBuried(msg.todo.id)) return true;

    for (var i = 0; i < app.$data.todos.length; ++i)
    {
        var todo = app.$data.todos[i];
        if (todo.id == msg.todo.id)
        {
            if (todo.version >= msg.todo.version) return true;
            todo.title = msg.todo.title;
            todo.version = msg.todo.version;
            return true;
        }
    }
    /* 
        At this point, the todo item corresponding to the id passed 
        in the message was not found.
        This could be a case of late "add" message.
    */
    return false;
}

同样,第一件事是检查待办事项是否已被埋葬。

然后处理程序遍历待办事项列表以查找应该更新的待办事项。当找到此待办事项时,仅当其版本号低于请求中传递的版本号时才更新它。

如果当前列表中未找到该待办事项,并且也没有被埋葬,那么最有可能的是尚未收到请求插入该待办事项的更新消息。因此,处理程序返回 false,这将导致消息被推入稍后应用的更新列表中。

deletecomplete 处理程序遵循相同的原则,检查墓地,并且仅在客户端的版本低于更新中提出的版本时才应用更新。

结论

这就是处理协作应用程序中潜在冲突的几个技巧。关于 Emitter 本身真的没有什么可写的,这正是因为它非常容易使用。

历史

  • 2017年2月4日:初始版本
© . All rights reserved.