可移动的全球分布式系统与数据库






4.95/5 (12投票s)
分布式系统架构,全球移动设备,免费构建您的全球应用
背景
设想一家只有三个员工的小公司,他们雄心勃勃,经营着一项全球业务,于是他们将员工派往纽约、伦敦和香港,每人配备一部智能手机,新手机连接电视和键盘时可以输出PC屏幕。这家公司刚刚起步,员工们不常来办公室,他们更常出现在机场、高速公路、咖啡馆、海滩,或者在村庄里谈生意。他们常常没有稳定的网络,但他们需要随时随地获取数据,我们应该在哪里放置或购买服务器呢?
他们的智能手机是最值得信赖的数据存储地。手机能成为软件系统的一部分吗?
免费,全球分布式架构
您眼前看到的是一部手机、一封电子邮件和整个世界。在本文的其余部分,我们将讨论如何用软件将它们连接成一个整体。我们将使用两个组件:NoSQL数据库iBoxDB来存储数据,电子邮件客户端MailKit来发送数据,以及Xamarin Forms来构建App UI。
放大这个架构,会看到很多方框。
数据将被打包成方框,并利用全球已部署的电子邮件服务器进行传递。电子邮件服务器的质量参差不齐,传递顺序和时间无法保证。如何确保我们能够正确解包数据。
消息ID
自互联网出现以来,节点之间一直在通信,ID设计变得和字体设计一样重要。我们有很多解决方案,这里介绍一种称为“依赖ID
”的解决方案,它的意思是,如果你想处理这条消息,你首先需要处理上一条消息,如果消息丢失了,就要求重新发送。
如上图所示,传统的点对点消息有一个有序的ID,很容易知道哪条丢失了。分布式消息也有有序ID,但没有ID服务器为所有节点创建ID,不容易弄清楚哪个ID丢失了,当我们添加了一个依赖ID后,事情就变得更容易了。
实现
要实现这个架构,我们需要注册四个电子邮件地址,分别来自全球四个城市的四个电子邮件服务提供商。为简化起见,我们在本地安装一个电子邮件服务器,并创建四个域来代表四个地点。
独立移动应用
在进行分布式部署之前,我们先构建一个独立的应用程序。启动一个Xamarin Forms解决方案,并在Android项目中添加两个组件。
Install-Package iBoxDB
Install-Package MailKit
Xamarin Forms 使用了很多绑定技术,我们遵循约定,添加一个Box
属性和一个PropertyChanged
事件。在iBoxDB中,数据被封装在Boxes中。
public class BoxSpace : INotifyPropertyChanged
{
Box box = null;
public Box Box
{
get { return box; }
set
{
box?.Dispose();
box = value;
PropertyChanged?.Invoke(this, new PropertyChangedEventArgs("Box"));
}
}
public event PropertyChangedEventHandler PropertyChanged;
}
当Box
属性改变时,它会触发Collection
从数据库重新加载记录。
public class ObservableTableCollection<T> : ObservableCollection<T> where T : class, new()
{
BoxSpace boxSpace;
string tableName;
public ObservableTableCollection(BoxSpace boxSpace, string tableName)
{
Init();
boxSpace.PropertyChanged += BoxSpace_PropertyChanged;
}
private void BoxSpace_PropertyChanged
(object sender, System.ComponentModel.PropertyChangedEventArgs e)
{
Init();
}
private IEnumerator<T> objects;
private void Init()
{
Clear();
objects = null;
objects = boxSpace.Box?.Select<T>("from " + tableName).GetEnumerator();
}
}
这个Collection
与UI绑定
public ObservableTableCollection<Item> GetItemTable()
{
return new ObservableTableCollection<Item>(BoxSpace, "Item");
}
public partial class ItemsPage : ContentPage
{
public ObservableTableCollection<Item> Items { get; set; }
public ItemsPage()
{
InitializeComponent();
Items = DataStore.Current.GetItemTable();
BindingContext = this;
}
}
当我们想更新UI时,我们只需要更新Box
属性
public void Update()
{
BoxSpace.Box = Auto.Cube();
}
我们不想一次性将所有数据加载到UI。为LoadNext
()方法添加一个限制,每次加载10条记录。
public int LoadNext()
{
...
int startIndex = Count;
List<T> list = new List<T>();
while (objects.MoveNext())
{
Items.Add(objects.Current);
list.Add(objects.Current);
if (list.Count >= 10)
{
break;
}
}
if (list.Count > 0)
OnCollectionChanged(new NotifyCollectionChangedEventArgs
(NotifyCollectionChangedAction.Add, list, startIndex));
}
当UI出现或滚动到List
末尾时,将调用此方法。
void OnAppearing(object sender, ItemVisibilityEventArgs e)
{
if (Items.Count == 0)
Items.LoadNext();
}
void OnItemAppearing(object sender, ItemVisibilityEventArgs e)
{
if (Items != null && e.Item == Items[Items.Count - 1])
{
Items.LoadNext();
}
}
主数据库定义
在本例中,我们只存储Item的数据,为此创建了一个数据库表。以下是定义:
DB db = new DB(...);
db.GetConfig().EnsureTable<Item>("Item", "Id", "DatabaseId");
db.GetConfig().EnsureTable<Confirmed>("Confirmed", "DatabaseId");
Auto = db.Open();
对于独立的应用程序,使用ID作为数据库的Key就足够了,但我们已经准备好进行分布式部署,系统将有许多数据库,Key将由Id和DatabaseId
组成,那么Confirmed
表的作用是什么呢?
对于分布式应用程序,网络可能会变得不稳定,就像在没有GPS的森林里一样,简单地确保我们走过哪条路的方法是做一个标记。我们稍后将使用Confirmed表。先来看看类。
public class GlobalObject
{
public long Id { get; set; }
public Guid DatabaseId { get; set; }
public DateTime Time { get; set; } = DateTime.UtcNow;
}
public class Item : GlobalObject
{
public string Text { get; set; }
public string Description { get; set; }
public decimal Price { get; set; }
}
public class Confirmed : GlobalObject
{
}
现在,我们可以将Item
的数据保存到数据库了。
public bool AddItem(Item item)
{
using (var box = Auto.Cube())
{
item.Id = Tick.GetNextTick();
item.DatabaseId = DatabaseId();
box["Item"].Insert(item);
CommitResult cr = box.Commit();
Update();
return cr == CommitResult.OK;
}
}
DatabaseId
是一个Guid
(全局唯一标识符),它在创建新数据库时生成。主数据库的Table Id是从Time
生成的,而不是sequenceId
。生成器的详细信息在SourceCode
中。
启动应用程序。
电子邮件服务能做什么
现在每个人都有一个独立的应用程序,记录着世界各地的价格。他们如何互相了解价格,通过手工发送电子邮件。回想一下我们是如何存储数据的。
using (var box = Auto.Cube())
{
box["Item"].Insert(item);
CommitResult cr = box.Commit();
}
我们将Item
放入一个盒子,能否通过软件将盒子寄往其他地方?答案是肯定的,这就是本文试图做到的。
电子邮件服务是第三方服务,好消息是它们已经标准化。您可以轻松地从一个提供商切换到另一个提供商,您可以选择服务质量最好的提供商,而无需更改任何代码。我们使用多个电子邮件地址来构建一个网络来传递Boxes。
DataStore.Current.AccountSpace.Account = new Account
{
Network = new string[] { "andy@newyork.net", "kelly@london.net",
"kevin@hongkong.net", "backup@backup.net" },
}
public static void SendEmail(MimeMessage message)
{
var account = DataStore.Current.AccountSpace.Account;
using (var client = new SmtpClient())
{
...
message.From.Add(new MailboxAddress(account.UserName, account.UserName));
foreach (var toName in account.Network)
{
message.To.Add(new MailboxAddress(toName, toName));
}
client.Send(message);
}
}
如上面的代码所示,我们向所有地址发送电子邮件。在本例中,我们只使用电子邮件传递消息,消息通过两个简单的规则在应用程序内部进行过滤:是我自己还是其他人。
if (log.DatabaseId == DBVar.DatabaseId)
{
...
}
else
{
...
}
下一个问题是,在哪里收集Box
?
日志数据库设计
当Box
提交到数据库时,它会触发一个OnReceived()
方法,但我们不能直接发送Box
,因为我们使用的是第三方服务,不知道网络是否稳定。首先,我们将Box
es放入数据库日志中,等待连接电子邮件服务器。在本例中,我们将日志存储在另一个iBoxDB
数据库中,为了与主数据库区分开,我们称之为日志数据库。
以下是日志数据库的定义:
public AutoBox Auto { get; private set; }
public Variable DBVar { get; private set; }
public DataStoreSync(long logAddr)
{
DB db = new DB(logAddr);
//Key={Id,DatabaseId}, Id before DatabaseId for faster Search.
db.GetConfig().EnsureTable<Log>("Log", "Id", "DatabaseId");
//Downloaded from Email Service, delete row after processed
db.GetConfig().EnsureTable<Log>("RemoteLog", "Id", "DatabaseId");
//Waiting for sending to others, clear all after synchronized
db.GetConfig().EnsureTable<Log>("WaitingSendLog", "Id", "DatabaseId");
//Confirmed Log-Id for all databases in the Network, count(*)==databases.length
db.GetConfig().EnsureTable<Confirmed>("Confirmed", "DatabaseId");
//database's variables, only one record, Id==0L
db.GetConfig().EnsureTable<Variable>("DBVar", "Id");
Auto = db.Open();
using (var box = Auto.Cube())
{
if (box["DBVar", 0L].Select<Object>() == null)
{
// this Table only have one record, Id==0L.
box["DBVar"].Insert(new Variable
{
Id = 0L,
DatabaseId = Guid.NewGuid(),//new databaseId
SentId = 0, //which Log from Log-Table had sent
ReceivedId = 0, //email sequenceId had downloaded
});
box.Commit().Assert();
}
}
using (var box = Auto.Cube())
{
DBVar = box["DBVar", 0L].Select<Variable>();
}
}
Log
表存储提交时收集的Box
es。RemoteLog
表存储通过电子邮件服务从远程应用程序下载的Box
es。WaitingSendLog
表存储在途中丢失的Box
es,等待重新发送。
RemoteLog
表和WaitingSendLog
表是缓冲区,在处理完成后将被清空。我们将Variable.ReceivedId
设置为零,应用程序从头开始下载电子邮件,如果电子邮件帐户不为空,您可以设置另一个值。
日志记录
所有盒子都以Log
格式记录。
class GlobalObject {
long Id { get; set; }
Guid DatabaseId { get; set; }
}
public class Log : GlobalObject
{
public const byte IncGroup = 1;
public long DependentId;
public Guid DependentDatabaseId;
public Guid GlobalId;
public MemoryStream MBox; //Binary Box
}
Log中的Id是一个序列号。日志有两个依赖日志,显式的日志记录在两个字段DependentId和DependentDatabaseId
中。隐式的日志是前一个日志,即Id减1
。
每个Log
总是有一个前一个Log
,但只有由远程Box
触发的Log
才具有显式依赖Log
。以下是我们如何将Box
写入Log
:
public void OnReceived(Socket socket, BoxData outBox, bool normal)
{
if (socket.Actions == 0) { return; }
using (var box = Auto.Cube())
{
if (!normal)
{
//limit 0,1, only check the last, descending order
foreach (var lastLog in box.Select<Log>("from Log limit 0,1"))
{
//had logged
if (socket.ID.Equals(lastLog.GlobalId))
{
return;
}
}
}
Log log = new Log()
{
Id = box.NewId(Log.IncGroup, 1),
DatabaseId = DBVar.DatabaseId,
GlobalId = socket.ID
};
if (socket.DestAddress != FromRemoteAddress)
{
//Current user operates, no remote dependency, local dependency is Log.Id-1L
log.DependentId = 0;
log.DependentDatabaseId = Guid.Empty;
log.MBox = new MemoryStream(outBox.ToBytes());
}
else
{
//Replicates from remote user, set dependency
Confirmed confirmed = DB.To<Confirmed>(new MemoryStream(socket.Tag));
log.DependentId = confirmed.Id;
log.DependentDatabaseId = confirmed.DatabaseId;
//Current database doesn't store the remote Box's data Again.
log.MBox = null;
}
box["Log"].Insert(log);
box.Commit().Assert();
}
}
Socket.ID
是每个盒子的唯一ID,首先我们通过socket.ID
检查这个Box
是否已被记录。什么是“FromRemoteAddress
”?再次回想一下我们是如何在box中存储数据的。
using (var box = Auto.Cube()) {
box["Item"].Insert(item);
CommitResult cr = box.Commit();
}
Cube()
方法的参数为空,表示在本例中来自本地用户。那么来自远程用户呢?
var confirmed = new Confirmed()
{
DatabaseId = log.DatabaseId,
Id = log.Id
};
if (log.MBox != null)
{
using (var mainBox =
DataStore.Current.Auto.GetDatabase()
.Cube(FromRemoteAddress, DB.From(confirmed).ToArray()))
{
var lastConfirmedId = mainBox["Confirmed", log.DatabaseId].Select<Confirmed>()?.Id;
if (lastConfirmedId == null || lastConfirmedId < log.Id)
{
BoxReplication.MasterReplicate(mainBox, new BoxData(log.MBox.ToArray()));
//this Replace(confirmed) records which remote Log had replicated to current DB,
//also trigger OnReceived() event to log it by using DependentId
mainBox["Confirmed"].Replace(confirmed);
mainBox.Commit().Assert();
}
}
}
box["Confirmed"].Replace(confirmed);
box.Commit().Assert();
它使用FromRemoteAddress
作为参数,并在通过BoxReplication.MasterReplicate
将远程数据复制到本地后,更新Confirmed
表以记录此来自远程数据库的Log已被处理。Confirmed表如下所示:
databaseId(主键) | Id(已确认) |
Guid(5F854E71-50B8-490A-BA24-0499A7D355D4) | 8 |
Guid(70403B66-A463-45FE-A692-F4CD2839E501) | 1 |
Guid(F7F2464F-8D07-4EF0-9A9E-07063672E691) | 21 |
在进行数据复制之前,我们进行一些依赖性检查。
if (box["Confirmed", log.DatabaseId].Select<Confirmed>()?.Id >= log.Id)
{
return true;
}
if (log.Id == 1L || box["Confirmed",
log.DatabaseId].Select<Confirmed>()?.Id >= (log.Id - 1L))
if (log.DependentDatabaseId == Guid.Empty ||
log.DependentDatabaseId == DBVar.DatabaseId ||
log.DependentId == 0L || box["Confirmed",
log.DependentDatabaseId].Select<Confirmed>()?.Id >= log.DependentId)
{
//Do Relication
}
}
我们使用一个循环扫描RemoteLog
缓冲区,直到没有更多的Log
可以被处理,这使得消息的接收顺序
变得不重要。
while (count != logs.Count)
{
count = logs.Count;
foreach (var remoteLog in logs)
{
bool success = ProcessRemoteLog(remoteLog);
if (success)
{
result++;
Auto.Delete("RemoteLog", remoteLog.Id, remoteLog.DatabaseId);
}
}
logs = Auto.Select<Log>("from RemoteLog order by Id");
}
将日志打包在一个消息中
在完成上述操作后,我们有很多日志,而不是逐个发送,我们通过电子邮件的附件将日志打包在一起。
var message = new MimeMessage();
message.Subject = CreateLogEmailTitle();
var builder = new BodyBuilder();
foreach (var log in all)
{
string fileName = $"{++num}_{log.Id}_{log.DatabaseId}.bin";
fileName = (log.DatabaseId == DBVar.DatabaseId ? "LOG_" : "ASK_") + fileName;
builder.Attachments.Add(fileName, DB.From(log),
ContentType.Parse("application/octet-stream"));
}
message.Body = builder.ToMessageBody();
EmailService.SendEmail(message);
SourceCode的其余部分是关于如何处理丢失。如今电子邮件服务不会丢失消息,这段代码可能永远不会被执行。您可以从SourceCode中阅读。
现在您可以同步世界各地的应用程序了。
锁系统
如果应用程序是关于共享信息的,没有两个用户同时编辑/删除同一篇文章,我们就不需要锁系统。如果您想添加一个,一个简单的解决方案是向所有节点发送一个Lock
消息,等待所有节点接受。Lock
消息可以要求锁定整个系统或仅锁定几个ID。
改进
在这个演示中,我们没有使用backup@backup.net地址。那是为节点关闭或旧日志被清理时设计的,我们可以将备份数据复制到新节点。
摘要
在本例中,我们使用iBoxDB存储数据,并通过电子邮件服务将Box回收给另一个应用程序。这种架构简单而清晰,我们只需添加几行代码,一个独立的应用程序就变成了一个分布式应用程序。
我们使用全球电子邮件系统作为全球消息传递系统,因为它免费,并且有成千上万的服务器支持这个庞大的系统。实际上,您有很多选择,任何能够发送和接收消息的全球服务都可以选择,包括SMS等。
如果您正在准备一个大型酷炫的系统,在花费数百万购买服务器和网络之前,也许您应该尝试一下这个架构作为一项作业。您需要了解一些事情,如果每个组件的可用性是99.999%,当您把它们加起来时,结果不是1000%,硬件问题很难调试,没有100%可靠的设备。时间会让设备变得更弱,我们可以利用时间来使系统更强大。
参考文献
历史
- 版本 1.0
- 版本 2.0
- 版本 2.0.1
- 版本 3.0
- 打包
Logs
- 使用
Socket.Tag
传递Object
- 打包