65.9K
CodeProject 正在变化。 阅读更多。
Home

可移动的全球分布式系统与数据库

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.95/5 (12投票s)

2013年3月11日

CPOL

8分钟阅读

viewsIcon

32187

downloadIcon

486

分布式系统架构,全球移动设备,免费构建您的全球应用

背景

设想一家只有三个员工的小公司,他们雄心勃勃,经营着一项全球业务,于是他们将员工派往纽约、伦敦和香港,每人配备一部智能手机,新手机连接电视和键盘时可以输出PC屏幕。这家公司刚刚起步,员工们不常来办公室,他们更常出现在机场、高速公路、咖啡馆、海滩,或者在村庄里谈生意。他们常常没有稳定的网络,但他们需要随时随地获取数据,我们应该在哪里放置或购买服务器呢?

他们的智能手机是最值得信赖的数据存储地。手机能成为软件系统的一部分吗?

免费,全球分布式架构

您眼前看到的是一部手机、一封电子邮件和整个世界。在本文的其余部分,我们将讨论如何用软件将它们连接成一个整体。我们将使用两个组件:NoSQL数据库iBoxDB来存储数据,电子邮件客户端MailKit来发送数据,以及Xamarin Forms来构建App UI。

放大这个架构,会看到很多方框。

数据将被打包成方框,并利用全球已部署的电子邮件服务器进行传递。电子邮件服务器的质量参差不齐,传递顺序和时间无法保证。如何确保我们能够正确解包数据。

消息ID

自互联网出现以来,节点之间一直在通信,ID设计变得和字体设计一样重要。我们有很多解决方案,这里介绍一种称为“依赖ID”的解决方案,它的意思是,如果你想处理这条消息,你首先需要处理上一条消息,如果消息丢失了,就要求重新发送。

如上图所示,传统的点对点消息有一个有序的ID,很容易知道哪条丢失了。分布式消息也有有序ID,但没有ID服务器为所有节点创建ID,不容易弄清楚哪个ID丢失了,当我们添加了一个依赖ID后,事情就变得更容易了。

实现

要实现这个架构,我们需要注册四个电子邮件地址,分别来自全球四个城市的四个电子邮件服务提供商。为简化起见,我们在本地安装一个电子邮件服务器,并创建四个域来代表四个地点。

独立移动应用

在进行分布式部署之前,我们先构建一个独立的应用程序。启动一个Xamarin Forms解决方案,并在Android项目中添加两个组件。

 Install-Package iBoxDB  
 Install-Package MailKit 

Xamarin Forms 使用了很多绑定技术,我们遵循约定,添加一个Box属性和一个PropertyChanged事件。在iBoxDB中,数据被封装在Boxes中。

public class BoxSpace : INotifyPropertyChanged
{
   Box box = null;
   public Box Box
   {
      get { return box; }
      set
      {
         box?.Dispose();
         box = value;
         PropertyChanged?.Invoke(this, new PropertyChangedEventArgs("Box"));
      }
   }

  public event PropertyChangedEventHandler PropertyChanged;
}

Box属性改变时,它会触发Collection从数据库重新加载记录。

public class ObservableTableCollection<T> : ObservableCollection<T> where T : class, new()
{
    BoxSpace boxSpace;
    string tableName;

    public ObservableTableCollection(BoxSpace boxSpace, string tableName)
    {
        Init();
        boxSpace.PropertyChanged += BoxSpace_PropertyChanged;
    }

    private void BoxSpace_PropertyChanged
    (object sender, System.ComponentModel.PropertyChangedEventArgs e)
    {
       Init();
    }

    private IEnumerator<T> objects;
    private void Init()
    {
       Clear();
       objects = null;
       objects = boxSpace.Box?.Select<T>("from " + tableName).GetEnumerator();
    }
}

这个Collection与UI绑定

public ObservableTableCollection<Item> GetItemTable()
{
   return new ObservableTableCollection<Item>(BoxSpace, "Item");
}     
public partial class ItemsPage : ContentPage
{
   public ObservableTableCollection<Item> Items { get; set; }

   public ItemsPage()
   {
      InitializeComponent();

      Items = DataStore.Current.GetItemTable(); 
      BindingContext = this;
   }
}

当我们想更新UI时,我们只需要更新Box属性

public void Update()
{
   BoxSpace.Box = Auto.Cube();
}

我们不想一次性将所有数据加载到UI。为LoadNext()方法添加一个限制,每次加载10条记录。

public int LoadNext()
{ 
            ...
  int startIndex = Count;
  List<T> list = new List<T>();
  while (objects.MoveNext())
  {
      Items.Add(objects.Current);
      list.Add(objects.Current);
      if (list.Count >= 10)
      {
         break;
      }
  }
  if (list.Count > 0)
    OnCollectionChanged(new NotifyCollectionChangedEventArgs
    (NotifyCollectionChangedAction.Add, list, startIndex));
               
}

当UI出现或滚动到List末尾时,将调用此方法。

void OnAppearing(object sender, ItemVisibilityEventArgs e)
{
   if (Items.Count == 0)
     Items.LoadNext();       
}
void OnItemAppearing(object sender, ItemVisibilityEventArgs e)
{
  if (Items != null && e.Item == Items[Items.Count - 1])
  {
     Items.LoadNext();
  }
}

主数据库定义

在本例中,我们只存储Item的数据,为此创建了一个数据库表。以下是定义:

  DB db = new DB(...);
  db.GetConfig().EnsureTable<Item>("Item", "Id", "DatabaseId");
  db.GetConfig().EnsureTable<Confirmed>("Confirmed", "DatabaseId");

  Auto = db.Open();

对于独立的应用程序,使用ID作为数据库的Key就足够了,但我们已经准备好进行分布式部署,系统将有许多数据库,Key将由Id和DatabaseId组成,那么Confirmed表的作用是什么呢?

对于分布式应用程序,网络可能会变得不稳定,就像在没有GPS的森林里一样,简单地确保我们走过哪条路的方法是做一个标记。我们稍后将使用Confirmed表。先来看看类。

public class GlobalObject
{
   public long Id { get; set; }
   public Guid DatabaseId { get; set; }

   public DateTime Time { get; set; } = DateTime.UtcNow;
}

public class Item : GlobalObject
{
   public string Text { get; set; }
   public string Description { get; set; }
   public decimal Price { get; set; }
}   
  
public class Confirmed : GlobalObject
{
}

现在,我们可以将Item的数据保存到数据库了。

public bool AddItem(Item item)
{
  using (var box = Auto.Cube())
  {
      item.Id = Tick.GetNextTick();
      item.DatabaseId = DatabaseId();
      box["Item"].Insert(item);
      CommitResult cr = box.Commit();
      Update();
      return cr == CommitResult.OK;
  }
}

DatabaseId是一个Guid(全局唯一标识符),它在创建新数据库时生成。主数据库的Table Id是从Time生成的,而不是sequenceId。生成器的详细信息在SourceCode中。

启动应用程序。

电子邮件服务能做什么

现在每个人都有一个独立的应用程序,记录着世界各地的价格。他们如何互相了解价格,通过手工发送电子邮件。回想一下我们是如何存储数据的。

using (var box = Auto.Cube())
{
   box["Item"].Insert(item);
   CommitResult cr = box.Commit();
}

我们将Item放入一个盒子,能否通过软件将盒子寄往其他地方?答案是肯定的,这就是本文试图做到的。

电子邮件服务是第三方服务,好消息是它们已经标准化。您可以轻松地从一个提供商切换到另一个提供商,您可以选择服务质量最好的提供商,而无需更改任何代码。我们使用多个电子邮件地址来构建一个网络来传递Boxes。

DataStore.Current.AccountSpace.Account = new Account
{ 
   Network = new string[] { "andy@newyork.net", "kelly@london.net", 
                            "kevin@hongkong.net", "backup@backup.net" },
}
public static void SendEmail(MimeMessage message)
{
   var account = DataStore.Current.AccountSpace.Account;

   using (var client = new SmtpClient())
   {
      ...
      message.From.Add(new MailboxAddress(account.UserName, account.UserName));
      foreach (var toName in account.Network)
      {
         message.To.Add(new MailboxAddress(toName, toName));
      }
      client.Send(message);
      
   }
}

如上面的代码所示,我们向所有地址发送电子邮件。在本例中,我们只使用电子邮件传递消息,消息通过两个简单的规则在应用程序内部进行过滤:是我自己还是其他人

if (log.DatabaseId == DBVar.DatabaseId)
{ 
   ...
}
else
{ 
  ...
}

下一个问题是,在哪里收集Box

日志数据库设计

Box提交到数据库时,它会触发一个OnReceived()方法,但我们不能直接发送Box,因为我们使用的是第三方服务,不知道网络是否稳定。首先,我们将Boxes放入数据库日志中,等待连接电子邮件服务器。在本例中,我们将日志存储在另一个iBoxDB数据库中,为了与主数据库区分开,我们称之为日志数据库。

以下是日志数据库的定义:

public AutoBox Auto { get; private set; }
public Variable DBVar { get; private set; }
public DataStoreSync(long logAddr)
{
   DB db = new DB(logAddr);

   //Key={Id,DatabaseId}, Id before DatabaseId for faster Search.
   db.GetConfig().EnsureTable<Log>("Log", "Id", "DatabaseId");

   //Downloaded from Email Service, delete row after processed
   db.GetConfig().EnsureTable<Log>("RemoteLog", "Id", "DatabaseId");
   //Waiting for sending to others, clear all after synchronized
   db.GetConfig().EnsureTable<Log>("WaitingSendLog", "Id", "DatabaseId");

   //Confirmed Log-Id for all databases in the Network,  count(*)==databases.length
   db.GetConfig().EnsureTable<Confirmed>("Confirmed", "DatabaseId");

   //database's variables, only one record, Id==0L
   db.GetConfig().EnsureTable<Variable>("DBVar", "Id");

   Auto = db.Open();
   using (var box = Auto.Cube())
   {
       if (box["DBVar", 0L].Select<Object>() == null)
       {
           // this Table only have one record, Id==0L.
           box["DBVar"].Insert(new Variable
           {
              Id = 0L,
              DatabaseId = Guid.NewGuid(),//new databaseId
              SentId = 0, //which Log from Log-Table had sent
              ReceivedId = 0, //email sequenceId had downloaded
           });
           box.Commit().Assert();
       }
   }

   using (var box = Auto.Cube())
   {
       DBVar = box["DBVar", 0L].Select<Variable>();
   }
}

Log表存储提交时收集的Boxes。RemoteLog表存储通过电子邮件服务从远程应用程序下载的Boxes。WaitingSendLog表存储在途中丢失的Boxes,等待重新发送。

RemoteLog表和WaitingSendLog表是缓冲区,在处理完成后将被清空。我们将Variable.ReceivedId设置为零,应用程序从头开始下载电子邮件,如果电子邮件帐户不为空,您可以设置另一个值。

日志记录

所有盒子都以Log格式记录。

class GlobalObject { 
  long Id { get; set; }  
  Guid DatabaseId { get; set; } 
}
public class Log : GlobalObject
{
   public const byte IncGroup = 1;

   public long DependentId;
   public Guid DependentDatabaseId;

   public Guid GlobalId;
   public MemoryStream MBox; //Binary Box

}

Log中的Id是一个序列号。日志有两个依赖日志,显式的日志记录在两个字段DependentId和DependentDatabaseId中。隐式的日志是前一个日志,即Id减1

每个Log总是有一个前一个Log,但只有由远程Box触发的Log才具有显式依赖Log。以下是我们如何将Box写入Log

public void OnReceived(Socket socket, BoxData outBox, bool normal)
{
    if (socket.Actions == 0) { return; }
    using (var box = Auto.Cube())
    {
        if (!normal)
        {
            //limit 0,1, only check the last, descending order
            foreach (var lastLog in box.Select<Log>("from Log limit 0,1"))
            {
                //had logged
                if (socket.ID.Equals(lastLog.GlobalId))
                {
                    return;
                }
            }
        }

        Log log = new Log()
        {
            Id = box.NewId(Log.IncGroup, 1),
            DatabaseId = DBVar.DatabaseId,

            GlobalId = socket.ID
        };

        if (socket.DestAddress != FromRemoteAddress)
        {
            //Current user operates, no remote dependency, local dependency is Log.Id-1L
            log.DependentId = 0;
            log.DependentDatabaseId = Guid.Empty;
            log.MBox = new MemoryStream(outBox.ToBytes());
        }
        else
        {
            //Replicates from remote user, set dependency
            Confirmed confirmed = DB.To<Confirmed>(new MemoryStream(socket.Tag));
            log.DependentId = confirmed.Id;
            log.DependentDatabaseId = confirmed.DatabaseId;
            //Current database doesn't store the remote Box's data Again.
            log.MBox = null;
        }
        box["Log"].Insert(log);

        box.Commit().Assert();
    }
}

Socket.ID是每个盒子的唯一ID,首先我们通过socket.ID检查这个Box是否已被记录。什么是“FromRemoteAddress”?再次回想一下我们是如何在box中存储数据的。

using (var box = Auto.Cube()) {   
  box["Item"].Insert(item);   
  CommitResult cr = box.Commit(); 
}

Cube()方法的参数为空,表示在本例中来自本地用户。那么来自远程用户呢?

var confirmed = new Confirmed()
{
    DatabaseId = log.DatabaseId,
    Id = log.Id
};

if (log.MBox != null)
{
    using (var mainBox =
        DataStore.Current.Auto.GetDatabase()
        .Cube(FromRemoteAddress, DB.From(confirmed).ToArray()))
    {
        var lastConfirmedId = mainBox["Confirmed", log.DatabaseId].Select<Confirmed>()?.Id;
        if (lastConfirmedId == null || lastConfirmedId < log.Id)
        {
            BoxReplication.MasterReplicate(mainBox, new BoxData(log.MBox.ToArray()));

            //this Replace(confirmed) records which remote Log had replicated to current DB, 
            //also trigger OnReceived() event to log it by using DependentId
            mainBox["Confirmed"].Replace(confirmed);
            mainBox.Commit().Assert();
        }
    }
}
box["Confirmed"].Replace(confirmed);
box.Commit().Assert();

它使用FromRemoteAddress作为参数,并在通过BoxReplication.MasterReplicate将远程数据复制到本地后,更新Confirmed表以记录此来自远程数据库的Log已被处理。Confirmed表如下所示:

databaseId(主键) Id(已确认)
Guid(5F854E71-50B8-490A-BA24-0499A7D355D4) 8
Guid(70403B66-A463-45FE-A692-F4CD2839E501) 1
Guid(F7F2464F-8D07-4EF0-9A9E-07063672E691) 21

在进行数据复制之前,我们进行一些依赖性检查。

if (box["Confirmed", log.DatabaseId].Select<Confirmed>()?.Id >= log.Id)
{
    return true;
}

if (log.Id == 1L || box["Confirmed", 
    log.DatabaseId].Select<Confirmed>()?.Id >= (log.Id - 1L))
    if (log.DependentDatabaseId == Guid.Empty || 
    log.DependentDatabaseId == DBVar.DatabaseId ||
        log.DependentId == 0L || box["Confirmed", 
        log.DependentDatabaseId].Select<Confirmed>()?.Id >= log.DependentId)
    {
        //Do Relication
    } 
}

我们使用一个循环扫描RemoteLog缓冲区,直到没有更多的Log可以被处理,这使得消息的接收顺序变得不重要。

while (count != logs.Count)
{
    count = logs.Count;
    foreach (var remoteLog in logs)
    {
        bool success = ProcessRemoteLog(remoteLog);
        if (success)
        {
            result++;
            Auto.Delete("RemoteLog", remoteLog.Id, remoteLog.DatabaseId);
        }
    }
    logs = Auto.Select<Log>("from RemoteLog order by Id");
}

将日志打包在一个消息中

在完成上述操作后,我们有很多日志,而不是逐个发送,我们通过电子邮件的附件将日志打包在一起。

var message = new MimeMessage();
message.Subject = CreateLogEmailTitle();

var builder = new BodyBuilder();
foreach (var log in all)
{
   string fileName = $"{++num}_{log.Id}_{log.DatabaseId}.bin";
   fileName = (log.DatabaseId == DBVar.DatabaseId ? "LOG_" : "ASK_") + fileName;
   builder.Attachments.Add(fileName, DB.From(log), 
   ContentType.Parse("application/octet-stream"));
}

message.Body = builder.ToMessageBody();
EmailService.SendEmail(message); 

SourceCode的其余部分是关于如何处理丢失。如今电子邮件服务不会丢失消息,这段代码可能永远不会被执行。您可以从SourceCode中阅读。

现在您可以同步世界各地的应用程序了。

锁系统

如果应用程序是关于共享信息的,没有两个用户同时编辑/删除同一篇文章,我们就不需要锁系统。如果您想添加一个,一个简单的解决方案是向所有节点发送一个Lock消息,等待所有节点接受。Lock消息可以要求锁定整个系统或仅锁定几个ID。

改进

在这个演示中,我们没有使用backup@backup.net地址。那是为节点关闭或旧日志被清理时设计的,我们可以将备份数据复制到新节点。

摘要

在本例中,我们使用iBoxDB存储数据,并通过电子邮件服务将Box回收给另一个应用程序。这种架构简单而清晰,我们只需添加几行代码,一个独立的应用程序就变成了一个分布式应用程序。

我们使用全球电子邮件系统作为全球消息传递系统,因为它免费,并且有成千上万的服务器支持这个庞大的系统。实际上,您有很多选择,任何能够发送和接收消息的全球服务都可以选择,包括SMS等。

如果您正在准备一个大型酷炫的系统,在花费数百万购买服务器和网络之前,也许您应该尝试一下这个架构作为一项作业。您需要了解一些事情,如果每个组件的可用性是99.999%,当您把它们加起来时,结果不是1000%,硬件问题很难调试,没有100%可靠的设备。时间会让设备变得更弱,我们可以利用时间来使系统更强大。

参考文献

历史

  • 版本 1.0
  • 版本 2.0
  • 版本 2.0.1
  • 版本 3.0
    • 打包Logs
    • 使用Socket.Tag传递Object
© . All rights reserved.