连续 SQL 流发送和处理系统简介 (第一部分: SQLite)
SocketPro在各种数据库上的应用,实现连续内联请求/结果批处理和实时流处理,支持双向异步数据传输
引言
大多数客户端-服务器数据库系统只支持客户端和后端数据库之间通过阻塞套接字和一些“健谈”协议进行同步通信,这要求客户端或服务器在发送新的数据块之前等待确认。等待时间,也称为延迟,在局域网(LAN)上可能从十分之一毫秒开始,到广域网(WAN)上几百毫秒不等。大量的等待时间会严重降低应用程序的质量。
幸运的是,UDAParts开发了一个强大而安全的通信框架,名为SocketPro。它集成了连续内联请求/结果批处理和实时流处理功能,通过使用异步数据传输和并行计算,实现了最佳的网络效率、开发简洁性、性能、可伸缩性以及许多出色的甚至独特的功能(请访问https://github.com/udaparts/socketpro)。有关详细信息,请参阅其简单的开发指南文件 ../socketpro/doc/dev_guide.htm。特别推荐快速浏览SocketPro的设计理念,请参阅文件 ../socketpro/doc/sp_arch.htm。
此外,UDAParts已将强大的SocketPro框架应用于SQLite、MySQL/MariaDB和MS SQL等一系列流行数据库,以及通过ODBC驱动程序支持的其他数据库,以实现连续SQL流发送和处理。另外,这些数据库组件中的大多数对公众来说是永久完全免费的,并提供开源代码供您学习和扩展以满足您的复杂需求。
为降低学习复杂度,第一篇文章使用SQLite数据库作为第一个示例,第二篇文章使用MySQL作为第二个示例,请参阅第二篇文章。
源代码和示例
所有相关的源代码和示例都位于https://github.com/udaparts/socketpro。通过GIT将其克隆到您的计算机后,请注意socketpro/stream_sql/usqlite目录下的usqlite子目录。
您可以看到这些示例是从.NET、C/C++、Java、Python和nodejs开发环境中创建的。它们可以在Linux或Windows平台上编译和运行。UDAParts还分发了预编译的测试服务器应用程序all_servers
,如../socketpro/doc/get_started.htm文档页面所示。请遵循此文档指南来分发SocketPro组件。我们将使用示例服务器all_servers
进行后续测试。
接下来的测试代码来自文件../socketpro/stream_sql/usqlite/test_sharp/Program.cs。本文将重点介绍C#开发。
两个基本结构,ErrInfo和SQLExeInfo
下面的代码片段1列出了用于各种数据库相关请求返回结果的两个基本结构:ErrInfo
和SQLExeInfo
。
//../socketpro/src/SproAdapter/socketerror.cs
public class ErrInfo
{
public ErrInfo(int res, string errMsg)
{
ec = res;
em = (null == errMsg) ? "" : errMsg;
}
public int ec = 0;
public string em = "";
public override string ToString()
{
string s = "ec: " + ec.ToString() + ", em: " + ((null == em) ? "" : em);
return s;
}
};
//../socketpro/src/sproadapter/asyncdbhandler.cs
public class SQLExeInfo : ErrInfo
{
public long affected;
public uint oks;
public uint fails;
public object lastId;
public SQLExeInfo(int res, string errMsg, long aff, uint oks, uint fails, object id)
: base(res, errMsg)
{
affected = aff;
this.oks = oks;
this.fails = fails;
lastId = id;
}
public override string ToString()
{
String s = base.ToString();
s += (", affected: " + affected);
s += (", oks: " + oks);
s += (", fails: " + fails);
s += (", lastId: " + lastId);
return s;
}
}
第一个结构ErrInfo
用于返回数据库请求open
、close
、prepare
、beginTrans
和endTrans
的结果。它很容易理解,因为它们只返回两个数据:错误代码ec及其对应的错误消息em
。
第二个结构SQLExeInfo
设计用于返回各种SQL语句execute
和executeBatch
的结果。除了错误代码及其对应的消息外,执行SQL语句还可以返回受影响的记录数和最后插入的标识符,这分别对应于结构SQLExeInfo
的两个成员affected
和lastId
。SocketPro数据库插件支持执行由多个基本SQL语句组成的复杂SQL语句。执行复杂SQL语句可能会生成多个基本SQL语句的多个结果。SocketPro数据库插件将计算成功和失败的次数,并将它们返回给客户端。因此,结构中有两个成员oks和fails分别表示成功和失败。但是,无论批量基本SQL语句发生多少次失败,SocketPro数据库插件始终只返回第一个失败SQL语句的错误代码和消息。
主函数
SocketPro从底层设计开始就支持并行计算,使用一个或多个非阻塞套接字池。每个池可以由一个或多个线程组成,每个线程在客户端托管一个或多个非阻塞套接字。然而,为了清晰演示,我们这里只使用一个池,并且该池由一个线程和一个套接字组成,如下面的代码片段2所示。
using System;
using System.Collections.Generic;
using SocketProAdapter;
using SocketProAdapter.ClientSide;
using SocketProAdapter.UDB;
using System.Threading.Tasks;
using KeyValue = System.Collections.Generic.KeyValuePair
<SocketProAdapter.UDB.CDBColumnInfoArray,SocketProAdapter.UDB.CDBVariantArray>;
class Program
{
static readonly string m_wstr;
static readonly string m_str;
static Program()
{
// ......
}
static void Main(string[] args)
{
Console.WriteLine("Remote host: ");
string host = Console.ReadLine();
CConnectionContext cc = new CConnectionContext(host, 20901,
"usqlite_client", "password_for_usqlite");
using (CSocketPool<CSqlite> spSqlite = new CSocketPool<CSqlite>())
{
//spSqlite.QueueName = "qsqlite";
if (!spSqlite.StartSocketPool(cc, 1)) //line 29, one async socket/one worker thread
{
Console.WriteLine("Failed in connecting to remote async sqlite server");
Console.WriteLine("Press any key to close the application ......");
Console.Read();
return;
}
CSqlite sqlite = spSqlite.Seek(); //line 36
//a container for receiving all tables data
List<KeyValue> lstRowset = new List<KeyValue>(); //line 38
try
{
//stream all DB requests with in-line batching for the best network efficiency
//open a global database at server side because an empty string is given
var topen = sqlite.open(""); //line 43
//prepare two test tables, COMPANY and EMPLOYEE
Task<CAsyncDBHandler.SQLExeInfo>[] vT = TestCreateTables(sqlite);
var tbt = sqlite.beginTrans(); //line 46, start manual transaction
//test both prepare and query statements
var tp0 = TestPreparedStatements(sqlite, lstRowset);
//test both prepare and query with reading/updating BLOB and large text
var tp1 = TestBLOBByPreparedStatement(sqlite, lstRowset);
var tet = sqlite.endTrans(); //line 51, end manual transaction
var vB = TestBatch(sqlite, lstRowset); //line 52
Console.WriteLine("All DB/SQL requests streamed & waiting for their results");
Console.WriteLine(topen.Result); //line 55
foreach (var e in vT)
{
Console.WriteLine(e.Result);
}
Console.WriteLine(tbt.Result);
Console.WriteLine(tp0.Result);
Console.WriteLine(tp1.Result);
Console.WriteLine(tet.Result);
foreach (var e in vB)
{
Console.WriteLine(e.Result);
} //line 67
}
catch (AggregateException ex) //line 69
{
foreach (Exception e in ex.InnerExceptions)
{
//An exception from server (CServerError), Socket closed after
//sending a request (CSocketError) or request canceled (CSocketError),
Console.WriteLine(e);
}
}
catch (CSocketError ex)
{
//Socket is already closed before sending a request
Console.WriteLine(ex);
}
catch (Exception ex)
{
//bad operations such as invalid arguments, bad operations and
//de-serialization errors, and so on
Console.WriteLine(ex);
} //line 88
//display received rowsets
int index = 0;
Console.WriteLine();
Console.WriteLine("+++++ Start rowsets +++");
foreach (KeyValue it in lstRowset)
{
Console.Write("Statement index = {0}", index);
if (it.Key.Count > 0)
Console.WriteLine(", rowset with columns: {0}, records: {1}.",
it.Key.Count, it.Value.Count / it.Key.Count);
else
Console.WriteLine(", no rowset received.");
++index;
}
Console.WriteLine("+++++ End rowsets +++");
Console.WriteLine();
Console.WriteLine("Press any key to close the application ......");
Console.Read();
}
}
static Task<CAsyncDBHandler.SQLExeInfo>[] TestBatch(CSqlite sqlite, List<KeyValue> ra)
{
// ......
}
static Task<CAsyncDBHandler.SQLExeInfo> TestPreparedStatements(CSqlite sqlite, List<KeyValue> ra)
{
// ......
}
static Task<CAsyncDBHandler.SQLExeInfo> TestBLOBByPreparedStatement(CSqlite sqlite, List<KeyValue> ra)
{
// ......
}
static Task<CAsyncDBHandler.SQLExeInfo>[] TestCreateTables(CSqlite sqlite)
{
// ......
}
}
启动一个套接字池:上面的代码片段2在第29行启动了一个套接字池,该套接字池只有一个工作线程,仅托管一个非阻塞套接字以实现演示的清晰性,使用了一个连接上下文实例。然而,如果您有需要,可以在一个客户端应用程序中创建多个池。之后,我们在第36行获得一个异步sqlite句柄。
打开数据库:我们可以发送一个请求来打开一个sqlite服务器数据库,如第43行所示。如果第一个输入是空字符串或null
字符串,如本例所示,我们打开的是服务器全局数据库usqlite.db的一个实例。如果您想创建一个自己的数据库,只需提供一个非空有效字符串。其返回结果topen
是一个未来期望的ErrInfo
结构的任务。
流式传输SQL语句:请记住,SocketPro的设计支持在单个非阻塞套接字会话上毫不费力地流式传输任意数量的任何类型的请求。当然,我们也可以像第43至52行所示那样流式传输所有SQL语句以及其他内容。所有SocketPro SQL流服务都支持这一特定功能,以实现最佳网络效率,从而显著提高数据访问性能。据我们所知,您在其他技术中找不到如此出色的功能。如果您找到,请告知我们。像普通的数据库访问API一样,SocketPro SQL流技术也支持手动事务,如第46行和第51行所示。我们将在后续章节中详细阐述TestCreateTables
、TestPreparedStatements
、TestBLOBByPreparedStatement
和TestBatch
这四个函数。请注意,所有这四个方法都会立即返回一个或多个SQLExeInfo
结构的任务。
请注意,SocketPro仅支持客户端和服务器之间的异步数据传输,因此所有请求和结果都在客户端和服务器端进行内联算法的流式传输和批处理,以实现最佳网络效率。这与同步数据传输完全不同。
等待所有处理完成:由于SocketPro默认使用异步数据传输,因此SocketPro必须提供一种方法来等待所有请求和返回结果被发送、返回并处理。如第55至67行所示,我们可以使用任务属性Result
来等待完成。此外,您还可以使用关键字await
或任务方法Wait
来等待所有数据库/SQL请求完成。
错误处理:第69至88行的代码向您展示了如何处理各种错误,包括服务器错误CServerError
、套接字通信和请求取消错误CSocketError
以及其他类型的错误。
TestCreateTables
此函数内部发送两个SQL DDL语句来创建COMPANY
和EMPLOYEE
两个表,如下面的代码片段3所示。
static Task<CAsyncDBHandler.SQLExeInfo>[] TestCreateTables(CSqlite sqlite)
{
var v = new Task<CAsyncDBHandler.SQLExeInfo>[2];
v[0] = sqlite.execute("CREATE TABLE COMPANY(ID INT8 PRIMARY KEY NOT NULL,name CHAR(64)"+
"NOT NULL,ADDRESS varCHAR(256)not null,Income float not null)");
v[1] = sqlite.execute("CREATE TABLE EMPLOYEE(EMPLOYEEID INT8 PRIMARY KEY NOT NULL unique,"+
"CompanyId INT8 not null,name NCHAR(64)NOT NULL,JoinDate DATETIME not null " +
"default(datetime('now')),IMAGE BLOB,DESCRIPTION NTEXT,Salary real,"+
"FOREIGN KEY(CompanyId)REFERENCES COMPANY(id))");
return v;
}
您可以像代码片段3所示那样,在流中执行任意数量的SQL语句,而无需等待前一个请求返回。每个请求接受一个输入SQL语句,并立即返回一个未来的SQLExeInfo
结构的未来任务。再次强调,这与常见的数据库访问方法不同,因为SocketPro使用异步数据传输进行通信。
TestPreparedStatements
SocketPro SQL流技术支持准备SQL语句,就像常见的数据库访问API一样。特别是,SocketPro SQL流技术甚至支持一次性准备多个SQL语句,用于SQLite服务器数据库,如下面的代码片段4所示。
static Task<CAsyncDBHandler.SQLExeInfo> TestPreparedStatements(CSqlite sqlite, List<KeyValue> ra)
{
//a complex SQL statement combined with query and insert prepare statements
sqlite.Prepare("Select datetime('now');" + //line 4
"INSERT OR REPLACE INTO COMPANY(ID,NAME,ADDRESS,Income)VALUES(?,?,?,?)"); //line 5
CDBVariantArray vData = new CDBVariantArray();
vData.Add(1);
vData.Add("Google Inc.");
vData.Add("1600 Amphitheatre Parkway, Mountain View, CA 94043, USA");
vData.Add(66000000000.0);
vData.Add(2);
vData.Add("Microsoft Inc.");
vData.Add("700 Bellevue Way NE- 22nd Floor, Bellevue, WA 98804, USA");
vData.Add(93600000000.0);
vData.Add(3);
vData.Add("Apple Inc.");
vData.Add("1 Infinite Loop, Cupertino, CA 95014, USA");
vData.Add(234000000000.0);
//send three sets of parameterized data in one shot for processing
return sqlite.execute(vData, (handler, rowData) =>
{
//rowset data come here
int last = ra.Count - 1;
KeyValue item = ra[last];
item.Value.AddRange(rowData);
}, (handler) =>
{
//rowset header meta info comes here
KeyValue item = new KeyValue(handler.ColumnInfo, new CDBVariantArray());
ra.Add(item);
});
}
请注意,示例准备的SQL语句包含一个查询和一个insert
语句。当调用该函数时,客户端将期望返回三组记录,并将三条记录插入COMPANY
表中。该示例旨在演示SocketPro SQL流技术的强大功能。实际上,您可能不会准备一个包含多个基本SQL语句的组合SQL语句。如果您使用参数化语句,则需要先发送准备请求,如第4行和第5行所示。获取数据数组后(如上面的代码片段4所示),您可以在最后一次性将多组参数数据发送到服务器进行处理。如果您有大量数据,可以重复调用execute
方法,而无需再次准备语句。
接下来,我们需要更详细地了解如何处理返回的记录集。execute
方法有两个回调或Lambda表达式作为第二个和第三个输入参数(第一个输入是参数数据数组)。每当有记录集到来时,SQLite客户端句柄将自动调用第二个回调((handler)=>{ ......})来获取记录集列元信息。如果实际记录可用,将调用第一个回调((handler, rowData)=>{ ......}),您可以将数据填充到容器ra
中。如果我们将代码片段4作为样本,回调将被调用三次,但预计第一个回调的调用次数取决于记录的数量和每条记录的大小。
TestBLOBByPreparedStatement
现在,您可以看到SocketPro SQL流技术提供了访问后端数据库所需的所有功能。此外,我们将演示如何在SocketPro流技术中处理大型二进制和文本对象。通常,高效地访问数据库中的大型对象是困难的。然而,使用SocketPro SQL流技术,开发和效率都非常简单,如下面的代码片段5所示。
查看代码片段5后,您会发现这段代码与前面的代码片段4非常相似,尽管这段代码更长。因此,这种方法对于软件开发人员来说是一个很好的选择,可以用相同的编码风格复用SocketPro SQL流技术来处理所有类型的数据库表字段,从而简化开发。
SocketPro始终在客户端和服务器端将大型二进制或文本对象分成块。然后,SocketPro将这些较小的块发送到另一端。最后,SocketPro将从收集到的较小块中重构原始的大型二进制或文本对象。这在运行时是无声发生的,以减少内存占用。
static Task<CAsyncDBHandler.SQLExeInfo> TestBLOBByPreparedStatement(CSqlite sqlite, List<KeyValue> ra)
{
//a complex SQL statement combined with two insert and query prepare statements
sqlite.Prepare("insert or replace into employee(EMPLOYEEID,CompanyId,name,JoinDate,imag,"+
"DESCRIPTION,Salary)values(?,?,?,?,?,?,?);select * from employee where employeeid=?");
CDBVariantArray vData = new CDBVariantArray();
using (CScopeUQueue sbBlob = new CScopeUQueue())
{
//first set of data
vData.Add(1);
vData.Add(1); //google company id
vData.Add("Ted Cruz");
vData.Add(DateTime.Now);
sbBlob.Save(m_wstr);
vData.Add(sbBlob.UQueue.GetBuffer());
vData.Add(m_wstr);
vData.Add(254000.0);
vData.Add(1);
//second set of data
vData.Add(2);
vData.Add(1); //google company id
vData.Add("Donald Trump");
vData.Add(DateTime.Now);
sbBlob.UQueue.SetSize(0);
sbBlob.Save(m_str);
vData.Add(sbBlob.UQueue.GetBuffer());
vData.Add(m_str);
vData.Add(20254000.0);
vData.Add(2);
//third set of data
vData.Add(3);
vData.Add(2); //Microsoft company id
vData.Add("Hillary Clinton");
vData.Add(DateTime.Now);
sbBlob.Save(m_wstr);
vData.Add(sbBlob.UQueue.GetBuffer());
vData.Add(m_wstr);
vData.Add(6254000.0);
vData.Add(3);
}
//send three sets of parameterized data in one shot for processing
return sqlite.execute(vData, (handler, rowData) =>
{
//rowset data come here
int last = ra.Count - 1;
KeyValue item = ra[last];
item.Value.AddRange(rowData);
}, (handler) =>
{
//rowset header meta info comes here
KeyValue item = new KeyValue(handler.ColumnInfo, new CDBVariantArray());
ra.Add(item);
});
}
TestBatch
SocketPro还提供了一个特殊方法,用于将所有数据库和SQL请求(如prepare
、beginTrans
、endTrans
和execute
)分组到一个大型批处理请求中,如下面的代码片段6所示。
static Task<CAsyncDBHandler.SQLExeInfo>[] TestBatch(CSqlite sqlite, List<KeyValue> ra)
{
var v = new Task<CAsyncDBHandler.SQLExeInfo>[2];
CDBVariantArray vParam = new CDBVariantArray();
vParam.Add(1); //ID
vParam.Add(2); //EMPLOYEEID
//there is no manual transaction if isolation is tiUnspecified
v[0] = sqlite.executeBatch(tagTransactionIsolation.tiUnspecified, //line 8
"Select datetime('now');select * from COMPANY where ID=?;"+
"select * from EMPLOYEE where EMPLOYEEID=?",
vParam, (handler, rowData) =>
{
//rowset data come here
int last = ra.Count - 1;
KeyValue item = ra[last];
item.Value.AddRange(rowData);
}, (handler) =>
{
//rowset header meta info comes here
KeyValue item = new KeyValue(handler.ColumnInfo, new CDBVariantArray());
ra.Add(item);
});
vParam.Clear();
vParam.Add(1); //ID
vParam.Add(2); //EMPLOYEEID
vParam.Add(2); //ID
vParam.Add(3); //EMPLOYEEID
//Same as sqlite.beginTrans();
//Select datetime('now');
//select * from COMPANY where ID=1;
//select * from COMPANY where ID=2;
//Select datetime('now');
//select * from EMPLOYEE where EMPLOYEEID=2;
//select * from EMPLOYEE where EMPLOYEEID=3
//ok = sqlite.endTrans(tagRollbackPlan.rpDefault);
v[1] = sqlite.executeBatch(tagTransactionIsolation.tiReadCommited, //line 36
"Select datetime('now');select * from COMPANY where ID=?;"+
"Select datetime('now');select * from EMPLOYEE where EMPLOYEEID=?",
vParam, (handler, rowData) =>
{
//rowset data come here
int last = ra.Count - 1;
KeyValue item = ra[last];
item.Value.AddRange(rowData);
}, (handler) =>
{
//rowset header meta info comes here
KeyValue item = new KeyValue(handler.ColumnInfo, new CDBVariantArray());
ra.Add(item);
}); //line 50
return v;
}
在TestBatch
方法中,我们在第8行调用executeBatch
方法,第一个输入是tiUnspecified
。由于有一个参数数据数组,批处理将通过查询三组记录并执行服务器端的一个准备语句来执行。如果第三个输入参数为空,executeBatch
方法将与execute
方法相同。
我们还可以将一组数据库和SQL请求分组,第一个输入不是tiUnspecified
,如第36至50行所示,尽管示例准备语句在此不需要手动事务。如此处代码片段中的注释所示,它涉及beginTrans
/endTrans
、prepare
、execute
等方法。
executeBatch
方法具有许多优点,例如更好的性能和更简洁的代码,以及与SocketPro客户端消息队列的更好集成,以实现更好的故障自动恢复,这将在最后一节讨论。
性能研究
SocketPro SQL流技术在数据库数据访问方面表现出色,无论是在查询还是更新方面。您可以在../socketpro/stream_sql/usqlite/DBPerf目录下找到两个性能测试项目(cppperf
和netperf
)。第一个示例是用C++编写的,另一个是用C#编写的。此外,MySQL的sakila
示例数据库(位于../socketpro/bin目录)可供您在运行示例服务器all_servers
以创建全局SQLite数据库usqlite.db后使用。
请参阅下面的图1的性能研究数据,该数据是从三个廉价的、带有固态驱动器的Google云虚拟机上获取的,用于免费评估。所有数据都是执行10,000个查询和50,000个插入操作所需的时间(毫秒)。性能研究还侧重于网络延迟对SQL访问速度的影响。
图1:SocketPro SQL流技术在三个廉价Google云虚拟机上的SQLite流处理性能研究数据
我们的性能研究表明,查询执行速度可以轻松达到每秒7,400次(10,000/1.36)的执行速度和套接字连接。对于插入记录,在本地网络(LAN,跨机器)上,SQLite每秒可以轻松达到120,000次(50,000/0.42)插入的写入速度。SocketPro流处理比传统的非流处理方法(SocketPro + Sync)性能提升140%。
在广域网(WAN,跨区域)上,查询速度可达每秒4,000次(10,000/2.24)的执行速度和套接字连接。对于插入记录,速度可以轻松达到每秒20,000条记录(50,000/2.51)。相反,在WAN上,如果客户端使用传统的通信方式(非流式)进行数据库访问,由于高延迟,查询速度会降低到每秒30次查询。在WAN(跨区域)高延迟情况下,如果数据库后端处理时间相对于IO通信时间可以忽略不计,SocketPro SQL流处理可以比非流式技术快150倍以上(346000/2240)。分析完上面的图1的性能数据后,您会发现SocketPro流处理技术不仅能显著加速本地数据库访问,也能加速远程数据库访问。
上述性能研究是在带宽约为40 Mbps的WAN上完成的,用于跨区域通信。可以想象,如果测试WAN具有更好的网络带宽,其性能数据将大大提高。此外,SocketPro还支持内联压缩,但本次测试未启用此功能。如果使用SocketPro内联压缩功能,其在WAN上的流式测试数据将进一步改善。最后,本次性能研究是在只有一两个CPU的廉价虚拟机上完成的。如果使用专用机器进行测试,性能数据将显著提高。
并行执行SQL语句并进行故障自动恢复
并行计算:研究完前面两个简单的例子后,现在是时候研究目录socketpro/samples/auto_recovery/(test_cplusplus|test_java|test_python|test_sharp)中的第三个示例了。SocketPro从底层设计开始就支持并行计算。您可以将多个SQL语句分发到不同的后端数据库进行并发处理。此功能旨在提高应用程序的可伸缩性,如下面的代码片段7所示。
using System;
using SocketProAdapter;
using SocketProAdapter.ClientSide;
class Program {
static void Main(string[] args) {
const int sessions_per_host = 2;
const int cycles = 10000;
string[] vHost = { "localhost", "192.168.2.172" };
using (CSocketPool<CSqlite> sp = new CSocketPool<CSqlite>()) {
sp.QueueName = "ar_sharp"; //set a local message queue to backup requests for auto fault recovery
CConnectionContext[,] ppCc = new CConnectionContext[1, vHost.Length * sessions_per_host]; //one thread enough
for (int n = 0; n < vHost.Length; ++n) {
for (int j = 0; j < sessions_per_host; ++j) {
ppCc[0, n * sessions_per_host + j] = new CConnectionContext(vHost[n], 20901, "AClientUserId", "Mypassword");
}
}
bool ok = sp.StartSocketPool(ppCc);
if (!ok) {
Console.WriteLine("There is no connection and press any key to close the application ......");
Console.Read(); return;
}
string sql = "SELECT max(amount), min(amount), avg(amount) FROM payment";
Console.WriteLine("Input a filter for payment_id");
string filter = Console.ReadLine();
if (filter.Length > 0) sql += (" WHERE " + filter);
var v = sp.AsyncHandlers;
foreach (var h in v) {
ok = h.Open("sakila.db", (hsqlite, res, errMsg) => {
if (res != 0)
Console.WriteLine("Error code: {0}, error message: {1}", res, errMsg);
});
}
int returned = 0;
double dmax = 0.0, dmin = 0.0, davg = 0.0;
SocketProAdapter.UDB.CDBVariantArray row = new SocketProAdapter.UDB.CDBVariantArray();
CAsyncDBHandler.DExecuteResult er = (h, res, errMsg, affected, fail_ok, lastId) => {
if (res != 0)
Console.WriteLine("Error code: {0}, error message: {1}", res, errMsg);
else {
dmax += double.Parse(row[0].ToString());
dmin += double.Parse(row[1].ToString());
davg += double.Parse(row[2].ToString());
}
++returned;
};
CAsyncDBHandler.DRows r = (h, vData) => {
row.Clear();
row.AddRange(vData);
};
CSqlite sqlite = sp.SeekByQueue(); //get one handler for querying one record
ok = sqlite.Execute(sql, er, r);
ok = sqlite.WaitAll();
Console.WriteLine("Result: max = {0}, min = {1}, avg = {2}", dmax, dmin, davg);
returned = 0;
dmax = 0.0; dmin = 0.0; davg = 0.0;
Console.WriteLine("Going to get {0} queries for max, min and avg", cycles);
for (int n = 0; n < cycles; ++n) {
sqlite = sp.SeekByQueue();
ok = sqlite.Execute(sql, er, r);
}
foreach (var h in v) {
ok = h.WaitAll();
}
Console.WriteLine("Returned = {0}, max = {1}, min = {2}, avg = {3}", returned, dmax, dmin, davg);
Console.WriteLine("Press any key to close the application ......"); Console.Read();
}
}
}
如上面的代码片段所示,我们可以启动多个非阻塞套接字连接到不同的机器(localhost, 192.168.2.172),并且每台数据库机器连接了两个套接字(const int sessions_per_host = 2;)。代码为每个连接打开一个默认数据库sakila.db(ok = h.Open("sakila.db" ......)。首先,代码在开始时执行一个查询‘SELECT max(amount), min(amount), avg(amount) FROM payment
…’,以获取一条记录。最后,代码将查询发送10,000次到两台机器上进行并行处理。每条记录将在Lambda表达式中作为Execute
方法的用于累加的回调进行处理。请注意,您可以为托管在不同机器上的不同服务创建多个池。如您所见,SocketPro套接字池可用于显著提高应用程序的可伸缩性。
自动故障恢复:SocketPro能够将所有请求数据本地打开并保存到一个文件中,然后再通过网络发送这些请求到服务器。该文件称为本地消息队列或客户端消息队列。其思想很简单,即备份所有请求以实现自动故障恢复。要使用此功能,您必须设置一个本地消息队列名称(sp.QueueName = "ar_sharp";
)。在开发实际应用程序时,编写大量代码来妥善处理各种通信错误是很常见的。实际上,这通常是软件开发人员面临的挑战。SocketPro客户端消息队列使通信错误处理变得非常简单。假设由于任何原因(如机器断电、未处理的异常、软件/硬件维护、网络断开连接等)机器192.168.2.172无法访问,套接字关闭事件将在立即或稍后被通知。一旦套接字池发现一个套接字已关闭,SocketPro将自动将与该套接字连接相关的所有请求合并到另一个尚未关闭的套接字上进行处理。
要验证此功能,您可以在执行上述查询期间强制关闭其中一个SQLite服务器(all_servers
),然后查看最终结果是否正确。
请注意,UDAParts已将此功能应用于所有SocketPro SQL流服务、异步持久消息队列服务和远程文件交换服务,以简化您的开发。
关注点
SocketPro SQLite SQL流服务提供了所有必需的基本客户端/服务器数据库功能,但它确实提供了以下独特的功能:
- 连续内联请求/结果批处理和实时SQL流处理,实现最佳网络效率,尤其是在WAN上
- 客户端和服务器之间的双向异步数据传输,但所有异步请求都可以转换为同步请求
- 卓越的性能和可伸缩性,得益于强大的SocketPro通信架构
- 实时缓存,用于表更新、插入和删除。您可以在客户端设置一个回调来跟踪表记录的添加、删除和更新事件,如示例项目test_cache在目录../socketpro/stream_sql/usqlite/test_cache中所示。
- 所有请求都可以通过在客户端执行
CClientSocket
类的Cancel
方法来取消 - 支持Windows和Linux
- 所有支持的开发语言均可轻松开发
- 客户端和服务器组件都是线程安全的。它们可以轻松地在多线程应用程序中重用,且线程相关的问题大大减少
- 所有请求都可以在客户端进行备份,并在服务器因任何原因宕机时重发到另一台服务器进行处理——故障自动恢复
历史
- 2017/09/06 ==> 初始发布
- 2017/09/30 ==> 根据codeproject.com官员的建议,移除图片,改用代码片段
- 2018/03/28 ==> 添加两个新章节,“性能研究”和“并行执行SQL语句并进行故障自动恢复”
- 2020/12/30 ==> 添加新章节“两个基本结构,ErrInfo和SQLExeInfo”
- 2020/12/30 ==> 添加新章节“TestBatch”
- 2020/12/30 ==> 主示例代码更新,使用Task减少回调次数,并修复代码片段5中的文本错误