65.9K
CodeProject 正在变化。 阅读更多。
Home

连续 SQL 流发送和处理系统简介 (第二部分: MySQL)

starIconstarIconstarIconstarIconstarIcon

5.00/5 (11投票s)

2017年9月21日

CPOL

17分钟阅读

viewsIcon

25953

连续 SQL 流发送和处理系统。

引言

大多数客户端服务器数据库系统仅支持客户端和后端数据库之间的同步通信,通过使用阻塞套接字和一些占用大量通信资源的协议,这些协议要求客户端或服务器在发送新数据块之前等待确认。等待时间,也称为延迟,从局域网(LAN)的十分之一秒到广域网(WAN)的几百毫秒不等。长时间的等待时间会严重降低应用程序的质量。

UDAParts 开发了一个强大且安全的通信框架,名为 SocketPro,该框架支持连续的内联请求/结果批处理和实时流处理,采用了异步数据传输和并行计算,以实现最佳的网络效率、开发简单性、性能、可扩展性以及众多卓越甚至独特的功能(https://github.com/udaparts/socketpro)。

此外,UDAParts 将强大的 SocketPro 框架应用于 SQLite、MySQL 和 MS SQL Server 等流行数据库,并通过 ODBC 驱动程序支持其他数据库,以实现连续的 SQL 流发送和处理。另外,这些数据库组件中的大部分对个人用户而言是完全免费的。为了降低学习复杂性,我建议您先学习 SQLite 的 SQL 流示例(第1部分:SQLite),然后再使用这些 MySQL 示例项目,因为 SQLite 和 MySQL 示例共享一套相同的客户端数据库 API 函数。

MySQL 是目前最受欢迎的开源客户端服务器分布式数据库管理系统。在研究了 MySQL/MariaDB 数据库服务器插件功能后,UDAParts 已将 SocketPro SQL 流技术应用于 MySQL/MariaDB,并开发了一个插件,以支持客户端的连续 SQL 语句发送和服务器端的处理,从而获得最佳性能和可扩展性。此外,UDAParts 将 SQL 流技术与 MySQL Connector/Net 进行了性能比较。我们的性能研究表明,在局域网(LAN)上,SQL 流技术可以比 MySQL Connector/Net 提供者快五倍,而在广域网(WAN)上则可快数百倍。

源代码和示例

所有相关的源代码和示例都位于 https://github.com/udaparts/socketpro。使用 GIT 克隆到您的计算机后,请注意 socketpro/stream_sql 目录下的 mysql 子目录。SocketPro MySQL 服务器插件源代码位于 socketpro/stream_sql/smysql 目录中。此外,您还可以看到从 .NET、C/C++、Java 和 Python 开发环境中创建的这些示例。然而,在本篇文章中,我们使用 C# 代码(socketpro/stream_sql/mysql/test_csahrp)进行客户端开发以供解释。

除了上述示例外,您还可以在 socketpro/stream_sql/mysql/DBPerf 目录中找到使用 MySQL 示例数据库 sakila 进行的性能研究示例。该子目录包含三个性能研究项目:cppperfnetperfmysqlperf,它们分别使用 C++/SocketPro SQL 流、.NET/SocketPro SQL 流和 ADO.NET 提供程序技术编写。

此外,SocketPro MySQL 服务器插件通过数据库触发器支持数据表更新事件(DELETEINSERTUPDATE)。您可以使用此功能将选定表上的更新事件推送到客户端或中间层。示例 C# 项目位于 socketpro/stream_sql/mysql/test_cache 目录中。

在运行这些示例应用程序之前,您需要按照 socketpro/doc/get_started.htm 文件中的说明,将 socketpro/binsocketpro/bin/free_services 目录中的系统库分发到您的 system 目录中。

关于 SocketPro 通信框架,您还可以参考其开发指南文件 socketpro/doc/dev_guide.htm

注册 MySQL/MariaDB SQL 流插件并设置其配置

我们将安装 MySQL 8 或更高版本、MySQL 5.7 或更早版本以及 MariaDB 的数据库插件。您必须已按照 socketpro/doc/get_started.htm 文件中的说明分发了服务器核心库(Linux 为 libuservercore.so,Windows 为 uservercore.dll)。

  • 将 SocketPro MySQL/MariaDB 数据库插件复制到数据库插件目录,并安装插件
    1. 通过执行语句 show variables where variable_name='plugin_dir' 来查找 MySQL/MariaDB 数据库插件目录
    2. MySQL/MariaDB 插件复制到数据库插件目录并安装插件
      • Windows:将 ../socketpro/bin/free_services/(mysql8_0_11|mysql5_7_22|mariadb)/win64 (或 win86) 中的 smysql.dll 复制到上面找到的数据库插件目录。之后,执行语句 INSTALL PLUGIN UDAParts_SQL_Streaming SONAME 'smysql.dll'
      • Linux:将 ../socketpro/bin/free_services/(mysql8_0_11|mysql5_7_22|mariadb)/ 中的 libsmysql.so 复制到上面找到的数据库插件目录。之后,执行语句 INSTALL PLUGIN UDAParts_SQL_Streaming SONAME 'libsmysql.so'
      确保在继续下一步之前没有错误输出。目录 mysql8_0_11mysql5_7_22mariadb 分别包含 MySQL 8 或更高版本、MySQL 5.7.33 或更早版本以及 MariaDB 的一个数据库插件。请在此处不要出错。
    3. 转到 MySQL/MariaDB 数据库数据目录,该目录可以通过执行查询 show variables where variable_name='datadir' 来找到。之后,您会找到两个生成的文件:sp_streaming_db_config.json 用于其他高级设置,streaming_db.log 用于错误输出。如果出现错误输出,streaming_db.log 文件很可能为您提供提示。此时,您可以成功编译并运行测试示例应用程序。
  • 安装示例数据库 sakila

    尽管此步骤是可选的,但强烈建议执行,因为我们广泛使用 sakila 作为示例数据库。如果您的 MySQL/MariaDB 没有安装 sakila 数据库,您可以在 GitHub 和 sakila 上搜索找到安装它的站点。您会找到许多提供安装著名的 sakila 示例数据库的 SQL 脚本的站点。例如,这个站点可能会对您有所帮助。

  • 通过修改文件 sp_streaming_db_config.json 来配置 SocketPro MySQL/MariaDB 数据库插件以实现高级功能

    接下来的配置也是可选的。此处提供它们是为了实现高级功能和其他服务。

    1. 首先,找到条目 services,并将其字符串值更改为 ssqlite;uasyncqueueMySQL/MariaDB 数据库服务器插件将加载两个服务:(sqlite 和服务器持久化队列)。您可以对其他服务这样做。每个服务应以分号分隔。
    2. 接下来,找到条目 monitored_tables,并将其字符串值更改为 sakila.actor;sakila.country;sakila.category;sakila.language。这样做将强制 MySQL/MariaDB 数据库服务器插件监视 actorcountrycategorylanguage 这四个表的 insertupdatedelete 触发器事件。SocketPro 使用这些触发器事件来实现客户端或中间层上的实时缓存。
    3. 停止 MySQL/MariaDB 数据库服务器,然后重新启动。此时,配置文件将被更新。如果出现错误,日志文件 streaming_db.log 将帮助您。
    4. 以下步骤对于 MySQL 8 或更高版本完全不是必需的。但是,如果您使用 MySQL 5.7 或更早版本或 MariaDB,请遵循以下两个步骤来完成前面的设置 2

      通过在 Linux 和 Windows 平台分别执行语句 CREATE FUNCTION SetSQLStreamingPlugin RETURNS INTEGER SONAME 'libsmysql.so'CREATE FUNCTION SetSQLStreamingPlugin RETURNS INTEGER SONAME 'smysql.dll' 来注册一个用户定义的函数 SetSQLStreamingPlugin

      最后,通过执行类似 select SetSQLStreamingPlugin('uid=root;pwd=Smash123') 的语句来调用用户定义的函数 SetSQLStreamingPlugin。在此,参数 uidpwd 分别代表用户 ID 和密码。

    5. 如果您了解 C#,可以通过在目录 ../socketpro/stream_sql/mysql/test_cache 中编译和运行测试项目来编译实时缓存功能。

    如果上述配置正确完成且没有任何错误,这些 SocketPro 数据库服务器插件将支持 SocketPro 服务器持久化消息、SQLite、SQL 请求流处理以及实时可更新缓存服务。此外,这些 SocketPro 数据库服务器插件将使用数据库账户来验证所有这些服务的客户端。

    当然,您可以为安全通信启用 SSL/TSLv1.x,更改监听端口,并修改 SocketPro 服务器持久化消息和 SQLite 服务的设置。最后,需要注意的是,配置文件中的一些条目是只读的,仅供您参考。

主函数

SocketPro 从底层开始构建,支持通过一个或多个非阻塞套接字池进行并行计算。每个池可以由一个或多个线程组成,每个线程在客户端承载一个或多个非阻塞套接字。通常,一个非阻塞套接字池仅由一个工作线程承载。在这里,我们仅使用一个池进行清晰演示,如下面的代码片段 1 所示。

using System;
using System.Collections.Generic;
using SocketProAdapter;
using SocketProAdapter.ClientSide;
using SocketProAdapter.UDB;
using System.Threading.Tasks;

using KeyValue = System.Collections.Generic.KeyValuePair
      <SocketProAdapter.UDB.CDBColumnInfoArray, SocketProAdapter.UDB.CDBVariantArray>;

class Program
{
    static readonly string m_wstr;
    static readonly string m_str;

    static void Main(string[] args)
    {
        Console.WriteLine("Remote host: ");
        string host = Console.ReadLine();
        CConnectionContext cc = new CConnectionContext(host, 20902, "root", "Smash123");
        using (CSocketPool<CMysql> spMysql = new CSocketPool<CMysql>())
        {
            //spMysql.QueueName = "qmysql";
            if (!spMysql.StartSocketPool(cc, 1)) //line 23
            {
                Console.WriteLine("Failed in connecting to remote async mysql server");
                Console.WriteLine("Press any key to close the application ......");
                Console.Read();
                return;
            }
            CMysql mysql = spMysql.Seek(); //line 30
            CDBVariantArray vPData = null, vData = null;
            List<KeyValue> ra = new List<KeyValue>(); //line 32
            CMysql.DRows r = (handler, rowData) => {
                //rowset data come here
                int last = ra.Count - 1;
                KeyValue item = ra[last];
                item.Value.AddRange(rowData);
            };

            CMysql.DRowsetHeader rh = (handler) => {
                //rowset header comes here
                KeyValue item = new KeyValue(handler.ColumnInfo, new CDBVariantArray());
                ra.Add(item);
            };
            try
            {
                //stream all requests with in-line batching for the best network efficiency
                var tOpen = mysql.open(""); //line 48
                var vT = TestCreateTables(mysql);
                var tDs = mysql.execute("delete from employee;delete from company");
                var tP0 = TestPreparedStatements(mysql);
                var tP1 = TestBLOBByPreparedStatement(mysql);
                var tSs = mysql.execute
                ("SELECT * from company;select * from employee;select curtime()",r,rh);
                var tStore = TestStoredProcedure(mysql, ra, out vPData);
                var tB = TestBatch(mysql, ra, out vData); //line 55
                Console.WriteLine();

                Console.WriteLine("All SQLs streamed and waiting results in order ......");
                Console.WriteLine(tOpen.Result); //line 59
                foreach (var t in vT) {
                    Console.WriteLine(t.Result);
                }
                Console.WriteLine(tDs.Result);
                Console.WriteLine(tP0.Result);
                Console.WriteLine(tP1.Result);
                Console.WriteLine(tSs.Result);
                Console.WriteLine(tStore.Result);
                Console.WriteLine("There are {0} output data returned", 2 * 2);
                Console.WriteLine(tB.Result); //line 69
                Console.WriteLine("There are {0} output data returned", 2 * 3);
            }
            catch (AggregateException ex) {
                foreach (Exception e in ex.InnerExceptions) {
                    //An exception from server (CServerError), Socket closed after sending
                    //request (CSocketError) or request canceled (CSocketError),
                    Console.WriteLine(e);
                }
            } catch (CSocketError ex) {
                //Socket is already closed before sending a request
                Console.WriteLine(ex);
            }
            catch (Exception ex) {
                //bad operations such as invalid arguments, bad operations and
                //de-serialization errors, and so on
                Console.WriteLine(ex);
            }
            int index = 0;
            Console.WriteLine();
            Console.WriteLine("+++++ Start rowsets +++");
            foreach (KeyValue it in ra) {
                Console.Write("Statement index = {0}", index);
                if (it.Key.Count > 0)
                    Console.WriteLine(", rowset with columns = {0}, 
                    records = {1}.", it.Key.Count, it.Value.Count / it.Key.Count);
                else
                    Console.WriteLine(", no rowset received.");
                ++index;
            }
            Console.WriteLine("+++++ End rowsets +++");
            Console.WriteLine();
            Console.WriteLine("Press any key to close the application ......");
            Console.Read();
        }
    }

    // ......
}
代码片段 1:SocketPro MySQL SQL 流系统客户端演示主函数

启动一个套接字池:上面的代码片段 1 在第 23 行启动了一个套接字池,该池只有一个工作线程,该线程仅承载一个非阻塞套接字以供演示清晰,并通过一个连接上下文实例使用。然而,需要注意的是,您可以在一个客户端应用程序中创建多个池(如果需要)。之后,我们在第 30 行获得一个异步 MySQL 处理程序。

打开数据库:我们可以在第 48 行发送请求以打开 MySQL 服务器数据库。如果第一个输入是空字符串或 null 字符串,如本例所示,我们将为已连接的用户打开一个默认数据库。如果您想打开一个指定的数据库,您可以简单地提供一个非空且有效的数据库名称字符串。此外,我们在第 32 行创建了一个容器实例 ra,它用作一个容器来接收查询中coming的所有记录集。

流式 SQL 语句:请记住,SocketPro 的设计宗旨是能够毫不费力地在单个非阻塞套接字会话上流式传输所有类型的任意数量的请求。当然,我们可以在单元测试函数(TestCreateTablesTestPreparedStatementsTestBLOBByPreparedStatementTestStoredProcedureTestBatch)中轻松地流式传输所有 SQL 语句和子请求(如第 48 行至 55 行所示)。所有 SocketPro SQL 流服务都支持此独特功能,以实现最佳网络效率,这将显著提高数据访问性能。据我们所知,您找不到如此出色的功能。如果您找到了,请告知我们。像普通数据库访问 API 一样,SocketPro SQL 流技术也支持手动事务,如上一篇文章所示。此外,预计所有返回的结果也将通过内联批处理从服务器流式传输到客户端。

等待所有处理完成:由于 SocketPro 只支持异步数据传输,因此 SocketPro 必须有一种方法来等待所有请求和返回结果发送、处理和返回,如第 59 行至 69 行所示。您可以看到,我们可以使用任务属性 Result 将所有异步请求转换为同步请求。当然,使用关键字 asyncawait 达到相同目的也是完美的。

TestCreateTables、TestPreparedStatements 和 TestBLOBByPreparedStatement

上面的代码片段 1 包含三个函数调用:TestCreateTablesTestPreparedStatementsTestBLOBByPreparedStatement,但我们不想再次解释它们,因为它们与上一篇文章中的完全相同。让我们专注于执行带有输入-输出和输出参数的 MySQL 存储过程。

TestStoredProcedure

MySQL 完全支持存储过程。SocketPro SQL 流技术也支持。此外,SocketPro SQL 流技术支持在一次调用中执行多个 MySQL 存储过程集,这些存储过程具有输入、输入-输出和输出参数(如上面的代码片段 2 所示),该片段还返回多个记录集,其中包含多个大型二进制对象和文本。

static Task<CAsyncDBHandler.SQLExeInfo> TestStoredProcedure
           (CMysql mysql, List<KeyValue> ra, out CDBVariantArray vPData) {
    vPData = new CDBVariantArray();
    //1st set
    vPData.Add(1);       //input
    vPData.Add(1.4);     //input-output
    //output not important and it's used for receiving a proper data from MySQL
    vPData.Add(0);       //output

    //2nd set
    vPData.Add(2);       //input
    vPData.Add(2.5);     //input-output
    //output not important and it's used for receiving a proper data from MySQL
    vPData.Add(0);       //output

    mysql.Prepare("call sp_TestProc(?,?,?)");
    CMysql.DRows r = (handler, rowData) => {
        //rowset data come here
        int last = ra.Count - 1;
        KeyValue item = ra[last];
        item.Value.AddRange(rowData);
    };
    CMysql.DRowsetHeader rh = (handler) => {
        //rowset header comes here
        KeyValue item = new KeyValue(handler.ColumnInfo, new CDBVariantArray());
        ra.Add(item);
    };
    return mysql.execute(vPData, r, rh);
}
代码片段 2:调用返回多个记录集和输出参数的 MySQL 存储过程

如上面的代码片段 2 所示,通过 SocketPro SQL 流技术调用存储过程非常简单。需要注意的是,所有输出参数数据将直接复制到传递的参数数据数组 vPData 中。如果记录集元数据可用,则会在元数据到来时调用回调函数 rh。每当数组记录数据到来时,都会调用回调函数 r。您可以从这两个回调中将所有查询的元数据和记录数据填充到任意容器中,例如 ra

TestBatch

下面的代码片段 3 与上一篇文章中的代码片段 6 基本相同,尽管这段代码包含更多行,用于在客户端和服务器之间传输 BLOB 和长文本。需要注意的是,SocketPro 服务器插件支持用户定义的定界符,可以是单个字符或字符串。在这里,它是一个字符,竖线,如第 3 行注释所示。我们故意使用了一个复杂的测试用例来展示 SocketPro SQL 请求流技术的强大功能。实际上,这里的单元测试代码并不复杂,但如果使用其他数据库访问 API,其测试用例将非常具有挑战性,因为该测试用例涉及大型 BLOB 和长文本的双向传输。

static Task<CAsyncDBHandler.SQLExeInfo> 
    TestBatch(CMysql mysql, List<KeyValue> ra, out CDBVariantArray vData) {
    //sql with delimiter '|' //line 3
    string sql = @"delete from employee;delete from company|
    INSERT INTO company(ID,NAME,ADDRESS,Income)VALUES(?,?,?,?)|
    insert into employee(CompanyId,name,JoinDate,image,DESCRIPTION,Salary)value(?,?,?,?,?,?)|
    SELECT * from company;select * from employee;select curtime()|
    call sp_TestProc(?,?,?)";
    vData = new CDBVariantArray();
    using (CScopeUQueue sbBlob = new CScopeUQueue()) {
        //1st set
        vData.Add(1);
        vData.Add("Google Inc.");
        vData.Add("1600 Amphitheatre Parkway, Mountain View, CA 94043, USA");
        vData.Add(66000000000.15);
        vData.Add(1);                         //google company id
        vData.Add("Ted Cruz");
        vData.Add(DateTime.Now);
        sbBlob.Save(m_wstr);
        vData.Add(sbBlob.UQueue.GetBuffer()); //BLOB
        vData.Add(m_wstr);                    //long unicode text
        vData.Add(254000.26);
        vData.Add(1);                         //input
        vData.Add(1.4);                       //input-output
        vData.Add(0);                         //output

        //2nd set
        vData.Add(2);
        vData.Add("Microsoft Inc.");
        vData.Add("700 Bellevue Way NE- 22nd Floor, Bellevue, WA 98804, USA");
        vData.Add(93600000000.37);
        vData.Add(1);                         //google company id
        vData.Add("Donald Trump");
        vData.Add(DateTime.Now);
        sbBlob.UQueue.SetSize(0);
        sbBlob.Save(m_str);
        vData.Add(sbBlob.UQueue.GetBuffer()); //BLOB
        vData.Add(m_str);                     //long ASCII text
        vData.Add(20254000.85);
        vData.Add(2);                         //input
        vData.Add(2.5);                       //input-output
        vData.Add(0);                         //output

        //3rd set
        vData.Add(3);
        vData.Add("Apple Inc.");
        vData.Add("1 Infinite Loop, Cupertino, CA 95014, USA");
        vData.Add(234000000000.09);
        vData.Add(2); //Microsoft company id
        vData.Add("Hillary Clinton");
        vData.Add(DateTime.Now);
        sbBlob.Save(m_wstr);
        vData.Add(sbBlob.UQueue.GetBuffer()); //BLOB
        vData.Add(m_wstr);                    //long unicode text
        vData.Add(6254000.55);
        vData.Add(0);                         //input
        vData.Add(4.5);                       //input-output
        vData.Add(0);                         //output
    }
    CMysql.DRows r = (handler, rowData) => {
        //rowset data come here
        int last = ra.Count - 1;
        KeyValuePair<CDBColumnInfoArray, CDBVariantArray> item = ra[last];
        item.Value.AddRange(rowData);
    };
    CMysql.DRowsetHeader rh = (handler) => {
        //rowset header comes here
        KeyValue item = new KeyValue(handler.ColumnInfo, new CDBVariantArray());
        ra.Add(item);
    };
    //first, start a transaction with ReadCommited isolation 
    //second, execute delete from employee;delete from company
    //third, prepare and execute three sets of
    //       INSERT INTO company(ID,NAME,ADDRESS,Income)VALUES(?,?,?,?)
    //fourth, prepare and execute three sets of 
    //insert into employee
    //       (CompanyId,name,JoinDate,image,DESCRIPTION,Salary)values(?,?,?,?,?,?)
    //fifth, SELECT * from company;select * from employee;select curtime()
    //sixth, prepare and three sets of call sp_TestProc(?,?,?)
    //last, commit transaction if there is no error; and otherwise, rollback
    return mysql.executeBatch(tagTransactionIsolation.tiReadCommited,sql, vData, r, rh, "|");
}
代码片段 3:在批处理中对具有用户定义定界符和存储过程以及输入、输入/输出和输出参数的 MySQL/MariaDB 数据库调用 executeBatch

executeBatch 方法具有许多优点,例如更好的性能、更清晰的代码以及与 SocketPro 客户端队列更好的集成,以实现自动故障恢复。

性能研究

SocketPro SQL 流技术在数据库数据访问方面具有出色的查询和更新性能。您可以在 socketpro/stream_sql/mysql/DBPerf/ 目录中找到两个 MySQL 性能测试项目(cppperfnetperf)。第一个示例是用 C++ 编写的,另一个是用 C# 编写的。还提供了一个用 C# 编写的示例项目 mysqlperf,供您将 SocketPro SQL 流技术与 MySQL .NET 提供程序进行性能比较。

请参阅下图 1 的性能研究数据,该数据来自三台廉价的 Google 云虚拟机,配备固态硬盘,用于免费评估。所有数据是以毫秒为单位执行 10,000 次查询和 50,000 次插入所需的时间。性能研究还侧重于网络延迟对 MySQL 访问速度的影响。

图 1:SocketPro SQL 流技术在三台廉价 Google 云虚拟机上的 MySQL 流式性能研究数据

我们的性能研究表明,对于 MySQL 在局域网(LAN,跨机器,0.2 ms/2.0 Gbps)上,查询可以轻松达到每秒 6,500 (10,000/1.54) 次的速度,连接套接字。对于插入记录,您可以轻松达到每秒 43,000 (50,000/1.17) 次插入的速度。在局域网(LAN)上,与传统的非流式方法(SocketPro + Sync)相比,SocketPro 流式处理在查询方面可将性能提高 150%。对于 SQL 插入,性能提升将超过七倍(10,400/1,170 = 8.9)。SocketPro 流式处理和内联批处理功能使网络效率极高,与现有的 MySQL 套接字通信方法相比,性能得到了显著提升。

让我们考虑广域网(WAN,跨区域,34 ms/40 Mbps)。SocketPro SQL 流式查询速度可以达到每秒 5,000 (10,000/2.00) 次,连接套接字。对于插入记录,速度可以轻松达到每秒 17,600 条记录 (50,000/2.84)。相反,如果客户端使用传统的通信方式(SocketPro+Sync/MySQL.NET Provider)进行数据库访问,由于高延迟,在 WAN 上的查询速度将低至每秒 30 次查询。SocketPro SQL 流式处理在查询方面可以比非流式技术快 170 倍以上(349000/2000 = 174.5),假设在 WAN(跨区域)上的数据库后端处理时间可忽略不计。如果我们考虑 SQL 插入,性能提升可能超过 600 倍(1,726,000/2840 = 607)。

分析图 1 中的性能数据后,您会发现 SocketPro 流式技术对于加速本地和远程数据库访问都非常出色。其次,如果测试的 WAN 具有更好的网络带宽,WAN 的性能数据将会更好。此外,SocketPro 支持内联压缩,但本次测试研究未使用它。如果使用了 SocketPro 内联压缩功能,其在 WAN 上的流式测试数据将进一步提高。最后,性能研究是在只有一两个 CPU 的廉价虚拟机上完成的。如果使用专用机器进行测试,性能数据会更好。

并行执行 SQL 并实现故障自动恢复

并行计算:学习完前面两个简单的例子后,是时候学习目录 socketpro/samples/auto_recovery/(test_cplusplus|test_java|test_python|test_sharp) 中的第三个示例了。SocketPro 从底层创建,以支持并行计算。您可以将多个 SQL 语句分发到不同的后端数据库以进行并发处理。此功能旨在提高应用程序的可扩展性,如下面的代码片段 4 所示。

using System;
using SocketProAdapter;
using SocketProAdapter.ClientSide;
class Program {
    static void Main(string[] args) {
        const int sessions_per_host = 2;
        string[] vHost = { "localhost", "192.168.2.172" };
        const int cycles = 10000;
        using (CSocketPool<CMysql> sp = new CSocketPool<CMysql>()) {
            //set a local message queue to backup requests for auto fault recovery
            sp.QueueName = "ar_sharp";
            
            //one thread enough
            CConnectionContext[,] ppCc = 
                       new CConnectionContext[1, vHost.Length * sessions_per_host];
            for (int n = 0; n < vHost.Length; ++n) {
                for (int j = 0; j < sessions_per_host; ++j) {
                    ppCc[0, n * sessions_per_host + j] = 
                         new CConnectionContext(vHost[n], 20902, "root", "Smash123");
                }
            }
            bool ok = sp.StartSocketPool(ppCc);
            if (!ok) {
                Console.WriteLine
                        ("No connection and press any key to close the application ......");
                Console.Read(); return;
            }
            string sql = "SELECT max(amount), min(amount), avg(amount) FROM payment";
            Console.WriteLine("Input a filter for payment_id"); 
                               string filter = Console.ReadLine();
            if (filter.Length > 0) sql += (" WHERE " + filter); var v = sp.AsyncHandlers;
            foreach (var h in v) {
                ok = h.Open("sakila", (hsqlite, res, errMsg) => {
                    if (res != 0) Console.WriteLine
                       ("Error code: {0}, error message: {1}", res, errMsg);
                });
            }
            int returned = 0;
            double dmax = 0.0, dmin = 0.0, davg = 0.0;
            SocketProAdapter.UDB.CDBVariantArray row = 
                                 new SocketProAdapter.UDB.CDBVariantArray();
            CAsyncDBHandler.DExecuteResult er = 
                            (h, res, errMsg, affected, fail_ok, lastId) => {
                if (res != 0)
                    Console.WriteLine("Error code: {0}, error message: {1}", res, errMsg);
                else {
                    dmax += double.Parse(row[0].ToString());
                    dmin += double.Parse(row[1].ToString());
                    davg += double.Parse(row[2].ToString());
                }
                ++returned;
            };
            CAsyncDBHandler.DRows r = (h, vData) => {
                row.Clear(); row.AddRange(vData);
            };
            CMysql mysql = sp.SeekByQueue(); //get one handler for querying one record
            ok = mysql.Execute(sql, er, r);
            ok = mysql.WaitAll();
            Console.WriteLine("Result: max = {0}, min = {1}, avg = {2}", dmax, dmin, davg);
            returned = 0; dmax = 0.0; dmin = 0.0; davg = 0.0;
            Console.WriteLine("Going to get {0} queries for max, min and avg", cycles);
            for (int n = 0; n < cycles; ++n) {
                mysql = sp.SeekByQueue();
                ok = mysql.Execute(sql, er, r);
            }
            foreach (var h in v) {
                ok = h.WaitAll();
            }
            Console.WriteLine("Returned = {0}, max = {1}, min = {2}, avg = {3}", 
                               returned, dmax, dmin, davg);
            Console.WriteLine("Press any key to close the application ......"); Console.Read();
        }
    }
}
代码片段 4:SocketPro 并行计算和故障自动恢复功能演示

如上面的代码片段 4 所示,我们可以启动多个非阻塞套接字连接到不同的机器(localhost, 192.168.2.172),每个数据库机器有两个套接字连接。代码为每个连接(foreach (var h in v) {......})打开一个默认数据库 sakila。首先,代码执行一个查询‘SELECT max(amount), min(amount), avg(amount) FROM payment …’以获取一条记录。最后,代码将查询发送 10,000 次到两台机器进行并行处理(for (int n = 0; n < cycles; ++n) {......})。每条记录将在 Lambda 表达式(CAsyncDBHandler.DExecuteResult er = (h, res, errMsg, affected, fail_ok, lastId) => {......};)中作为方法 Execute 的回调进行汇总。需要注意的是,您可以为托管在不同机器上的不同服务创建多个池。如您所见,SocketPro 套接字池可用于显著提高应用程序的可扩展性。

自动故障恢复:SocketPro 可以本地打开一个文件,并将所有请求数据保存到其中,然后通过网络将这些请求发送到服务器。该文件称为本地消息队列或客户端消息队列。这个想法很简单,就是备份所有请求以实现自动故障恢复。要使用此功能,您必须设置一个本地消息队列名称(sp.QueueName = "ar_sharp";),如上面的代码片段 4 所示。当我们开发实际应用程序时,编写大量代码来妥善处理各种通信错误是很常见的。事实上,这通常是软件开发人员面临的挑战。SocketPro 客户端消息队列使通信错误处理非常简单。假设机器 192.168.2.172 由于任何原因(如机器断电、未处理的异常、软件/硬件维护和网络断开等)而无法访问,套接字关闭事件将立即或稍后通知。一旦套接字池发现一个套接字已关闭,SocketPro 将自动将与该套接字连接相关的所有请求合并到另一个尚未关闭的套接字上进行处理。

要验证此功能,您可以在执行上述查询时强制关闭其中一个 MySQL 服务器,并查看最终结果是否正确。

需要注意的是,UDAParts 已将此功能应用于所有 SocketPro SQL 流服务、异步持久化消息队列服务和远程文件交换服务,以简化您的开发。

关注点

最后,SocketPro MySQL SQL 流插件完全不支持游标,但它确实提供了所有必需的基本客户端/服务器数据库功能。此外,SQL 流插件确实具有以下独特功能。

  1. 连续的内联请求/结果批处理和实时 SQL 流处理,可在局域网广域网上实现最佳网络效率
  2. 默认情况下,客户端和服务器之间进行双向异步数据传输,但所有异步请求在需要时都可以转换为同步请求。
  3. 卓越的性能和可扩展性,归功于强大的 SocketPro 通信架构。SocketPro SQL 流技术在所有开发语言和平台上,比所有类型的已知 MySQL/MariaDB 客户端 API 都明显更快、更具可扩展性。
  4. 表更新、插入和删除的实时缓存,如目录 socketpro/stream_sql/mysql/test_cache 中的示例项目 test_cache 所示。
  5. 所有请求均可在客户端通过执行 CClientSocket 类的 Cancel 方法来取消
  6. 支持 Windows 和 Linux。
  7. 所有支持的开发语言的开发简单
  8. 客户端和服务器组件都是线程安全的。它们可以轻松地在多线程应用程序中重复使用,且线程相关问题大大减少。
  9. 所有请求都可以在客户端进行备份,并在服务器出现任何原因(例如服务器宕机)时自动重新发送到另一台服务器进行处理——故障自动恢复

历史

  • 2017年9月20日:初始发布
  • 2018年2月28日:添加两个新章节:性能研究并行执行 SQL 并实现故障自动恢复
  • 2018年5月27日:更新 MySQL 服务器 SQL 流插件以支持MySQL 8.0.11 或更高版本
  • 2021年4月4日:使用 SQL 流方法的任务版本替代原始版本
  • 2021年4月4日:扩展插件以支持 MariaDB 和 MySQL 5.7.x 或更早版本
© . All rights reserved.