65.9K
CodeProject 正在变化。 阅读更多。
Home

集成 SAP-HANA EML 库和 TensorFlow 模型服务器 (TMS) 来预测 S&P 500 指数:第三部分:使用 EML 库预测 S&P 500 指数。

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.83/5 (5投票s)

2019年9月29日

CPOL

7分钟阅读

viewsIcon

6072

downloadIcon

91

如何使用 EML 库在 SAP-HANA SQL-engine 后端预测 S&P 500 指数。

引言

这是本系列的最后一篇文章,在文中我们将讨论如何使用 SAP-HANA AI 数据分析基础来预测标普 500 指数股票市场的指标值。在本文中,我们将介绍并讨论代码,实现 SAP-HANA 客户端应用程序,该应用程序使用 EML 库在 SQL 引擎后端触发标普 500 指数预测。具体来说,我们将讨论如何预先配置 SAP-HANA EML 库,以编程方式建立与 SAP-HANA SQL 引擎的连接,创建、配置和执行一个用户定义的存储过程,该过程与 TensorFlow 模型服务器实例交互以执行特定的预测。此外,我们还将讨论如何获取和可视化使用用户定义的存储过程获得的预测结果。

设置 SAP-HANA EML 库配置

为了能够在 SAP-HANA SQL 引擎后端使用先前构建和导出的模型进行标普 500 指数预测,我们必须首先更新 SAP-HANA EML 库配置。为此,我们必须使用 Eclipse SAP-HANA Studio 插件连接 SP500 租户数据库并执行以下 SQL 脚本。

SET SCHEMA SP500;

INSERT INTO _SYS_AFL.EML_MODEL_CONFIGURATION VALUES _
                ('sp500_model' , 'RemoteSource', 'TensorFlow');

DROP REMOTE SOURCE "TensorFlow";

CREATE REMOTE SOURCE "TensorFlow" ADAPTER "grpc" CONFIGURATION 'server=localhost;port=8500';

DROP TABLE CHECK_PARAMS;
DROP TABLE UPDATE_CONFIGURATION_PARAMS;
DROP TABLE UPDATE_CONFIGURATION_RESULT;
DROP PROCEDURE UPDATE_CONFIGURATION;

CREATE TABLE UPDATE_CONFIGURATION_PARAMS ("Parameter" VARCHAR(100), "Value" VARCHAR(100));
CREATE TABLE UPDATE_CONFIGURATION_RESULT _
    ("Key" VARCHAR(100), "Value" INTEGER, "Text" VARCHAR(100));

CREATE PROCEDURE UPDATE_CONFIGURATION() AS
BEGIN
  DECLARE CURSOR CUR FOR
      SELECT VOLUME_ID FROM SYS.M_VOLUMES WHERE SERVICE_NAME = 'indexserver';
  FOR CUR_ROW AS CUR DO
      EXEC 'CALL _SYS_AFL.EML_CTL_PROC_
      (''UpdateModelConfiguration'', UPDATE_CONFIGURATION_PARAMS, UPDATE_CONFIGURATION_RESULT)'
         || ' WITH OVERVIEW WITH HINT(ROUTE_TO(' || :CUR_ROW.VOLUME_ID || '))';
  END FOR;
END;

TRUNCATE TABLE UPDATE_CONFIGURATION_RESULT;
CALL UPDATE_CONFIGURATION();

CREATE TABLE CHECK_PARAMS ("Parameter" VARCHAR(100), "Value" VARCHAR(100));
INSERT INTO CHECK_PARAMS VALUES ('Model', '*');
CALL _SYS_AFL.EML_CHECKDESTINATION_PROC(CHECK_PARAMS, ?);    

上面列出的代码首先向内部系统表 _SYS_AFL.EML_MODEL_CONFIGURATION 插入一行,提供模型和远程源名称。之后,它创建一个将使用 grpc 协议与 TensorFlow 模型服务器 (TMS) 通信的远程源。在远程源创建子句中,我们必须指定适当的配置,例如 TMS 服务器的域名或 IP 地址,以及用于建立 TMS 服务器连接的端口。在这种情况下,由于 TMS 服务器与 SAP-HANA SQL 引擎运行在同一主机上,我们使用默认配置参数。

接下来,以下脚本创建两个表 UPDATE_CONFIGURATION_PARAMSUPDATE_CONFIGURATION_RESULT,以存储远程源配置参数。第一个表将包含参数和值列,第二个表包含键和值列。之后,我们创建一个存储过程 UPDATE_CONFIGURATION(),该过程将触发另一个 EML 库内置的 UpdateModelConfiguration 过程,以更新 EML 库后端上的模型配置。UpdateModelConfiguration 过程将 EML 库配置参数插入到先前创建的这些表中,使用 SP500 租户数据库的索引服务器 volume-id

最后,我们需要做的是创建 CHECK_PARAMS 表,并将包含“Model”和“*”这两个值的行插入到以下表中。此外,我们必须调用内置的 _SYS_AFL.EML_CHECKDESTINATION_PROC 过程,该过程实际接受一个由先前创建的 CHECK_PARAMS 表组成的单个参数。

以编程方式连接到 SAP-HANA SQL 引擎

为了在 SAP-HANA SQL 引擎后端预测标普 500 指数,我们必须实现一个 SAP-HANA Python 客户端应用程序,该应用程序以编程方式连接到 SQL 引擎,并将评估数据集从 sp500.csv 文件导入到先前创建的 SP500 数据库。

为了准备评估数据集,我们将使用我们在上一篇文章中已经实现和讨论过的函数。具体来说,我们将从 csv 文件加载整个数据集到 Pandas 数据框,然后准备好将其导入到 SP500 数据库。首先,我们需要检索加载到 Pandas 数据框中的数据集的列名集合。这些数据将进一步用于在 SP500 数据库中创建一个特定表,该表将包含每家公司在每个时间点(即 timestamp)的股票价格值,正如在上一篇文章中已经讨论过的。之后,我们将需要从同一个 Pandas 数据框中检索实际的股票价格值,并将其转换为 Python 的通用列表。最后,我们将使用已经实现的 get_datasets(…) 函数来准备评估数据集。

train_ratio = 0.8
filename = "sp500.csv"
# Load SP500.CSV file
dataset = pd.read_csv(filename);

# Retrieve a set of columns
columns = dataset.columns;
# Retrieve the dataset and convert it to list
dataset = dataset.values.tolist()

# Prepare the evaluation dataset
(train_X, train_Y, test_X, test_Y, 
   train_min_vals_X, train_max_vals_X, train_min_vals_Y, train_max_vals_Y,
     test_min_vals_X, test_max_vals_X, test_min_vals_Y, test_max_vals_Y) = \
                                       get_datasets(dataset, train_ratio)

在我们准备好特定的评估数据集之后,现在是时候建立与 SP500 租户数据库的连接,以创建一个特定的表,该表将用于存储用于我们先前构建和导出的模型进行预测的标普 500 指数评估数据。在打开特定连接之前,我们必须首先定义几个连接参数,如下所示。

tf.app.flags.DEFINE_string ('f', '', 'kernel')
tf.app.flags.DEFINE_string ('hxeinst_host'   , '192.168.0.177' , 'HXE host')
tf.app.flags.DEFINE_integer('hxeinst_port'   ,  39041          , 'HXE port')
tf.app.flags.DEFINE_string ('hxeinst_db'     , 'SP500'         , 'HXE database name')
tf.app.flags.DEFINE_string ('hxeinst_user'   , 'SP500'         , 'HXE user name')
tf.app.flags.DEFINE_string ('hxeinst_passwd' , 'Ezantj7912'    , 'HXE password')

args = tf.app.flags.FLAGS

最后,我们必须调用下面列出的函数来建立连接。

connection = dbapi.connect(address=args.hxeinst_host, 
    port=args.hxeinst_port, user=args.hxeinst_user, 
    password=args.hxeinst_passwd, database=args.hxeinst_db)
cursor = connection.cursor()

此外,我们必须执行 SET SCHEMA SP500; SQL 子句来选择合适的租户数据库。

cursor.execute('SET SCHEMA SP500;')    

由于我们已经建立了与 SP500 租户数据库的连接,现在我们可以通过动态创建特定的 SP500_DATA 表并将已经准备好的数据集中的特定数据插入其中来导入评估数据集。

将评估数据集导入 SAP-HANA 数据库

要将评估数据集导入 SP500 租户数据库,我们必须动态创建 SP500_DATA 表。为此,我们必须以编程方式构建一个创建特定表的 SQL 查询。为了做到这一点,我们必须使用从 Pandas 数据框检索到的列列表。下面代码的片段首先创建了一个包含每列的 FLOAT SQL 数据类型的列列表,然后将此列表连接成一个包含每个列名及其数据类型的单个字符串。之后,我们构造一个格式化的字符串,其中包含用于创建 SP500_DATA 表的特定 SQL 子句。

try:
    # Drop SP500_DATA table if it already exists
    cursor.execute('DROP TABLE SP500.SP500_DATA;')
except (RuntimeError, TypeError, NameError, dbapi.Error):
    pass

# Construct a list of columns with their datatype
columns = [col + ' FLOAT' for col in columns.tolist()]
# Construct a string containing column names and their datatype
columns = ','.join(np.array(columns)[2:]).replace('.','_')

# Construct an SQL-query to create SP500_DATA table
query = 'CREATE TABLE SP500.SP500_DATA ({0});'.format(columns)

# Execute the query
cursor.execute(query)

在我们创建了特定的 SP500_DATA 表之后,现在可以通过运行以下代码将评估数据插入其中。

for row in test_X:
    vals = ','.join([str(val) for val in row]);
    query = 'INSERT INTO SP500.SP500_DATA VALUES ({0})'.format(vals)
    cursor.execute(query)

在这段代码中,我们遍历评估数据集中的每一行并动态创建 SQL 查询字符串。对于每一行,我们从每列检索股票价格值,并将它们附加到一个由逗号分隔的字符串中。之后,我们构造一个 SQL 查询字符串,该字符串在执行时会将每一行的所有列值插入到 SP500_DATA 表中。实际上,SP500_DATA 表的结构与我们的原始数据集相同。

使用 SAP-HANA EML 库预测标普 500 指数

由于我们已经创建了 SP500_DATA 表并将数据集导入该表,现在是时候讨论如何在 SAP-HANA SQL 引擎后端预测标普 500 指数了。这通常通过创建和配置特定的 EML 用户定义过程来完成,该过程与正在运行的 TensorFlow 模型服务器 (TMS) 实例通信,标普 500 预测模型当前在该实例中提供服务。

在创建这样的过程之前,我们必须先执行一些配置步骤,例如创建三个配置表。

  • SP500_PARAMS - 该表包含模型及其签名、远程源名称和截止时间等参数。
  • SP500_PROC_PARAM_TABLE - 该表用于存储 EML 用户定义过程的参数,包括参数表名称(例如 SP500_PARAMS)、评估数据表名称(例如 SP500_DATA)以及预测结果表 SP500_RESULTS
  • SP500_RESULTS - 该表用于存储 EML 用户定义过程返回的预测结果。

这是实现 EML 用户定义过程预配置的代码片段。

try:
    # Drop the SP500_PARAMS Table If It already exists
    cursor.execute('DROP TABLE SP500.SP500_PARAMS;')
except (RuntimeError, TypeError, NameError, dbapi.Error):
    pass    

# Create SP500_PARAMS table
cursor.execute("CREATE TABLE SP500_PARAMS _
    (\"Parameter\" VARCHAR(100), \"Value\" VARCHAR(100));")

# Insert the SP500 EML user-defined procedure parameters
cursor.execute("INSERT INTO SP500_PARAMS VALUES ('Model', 'sp500_model%sp500_prediction');")
cursor.execute("INSERT INTO SP500_PARAMS VALUES ('RemoteSource', 'TensorFlow');")
cursor.execute("INSERT INTO SP500_PARAMS VALUES ('Deadline', '10000');")

try:
    # Drop the SP500_RESULTS Table If It already exists 
    cursor.execute('DROP TABLE SP500.SP500_RESULTS;')
except (RuntimeError, TypeError, NameError, dbapi.Error):
    pass    

# Create SP500_RESULTS table
cursor.execute("CREATE TABLE SP500_RESULTS (SP500INDEX FLOAT);")

try:
    # Drop the SP500_PROC_PARAM_TABLE Table If It already exists 
    cursor.execute('DROP TABLE SP500.SP500_PROC_PARAM_TABLE;')
except (RuntimeError, TypeError, NameError, dbapi.Error):
    pass    

# Create SP500_PROC_PARAM_TABLE table
cursor.execute("CREATE COLUMN TABLE SP500_PROC_PARAM_TABLE _
 (POSITION INTEGER, SCHEMA_NAME NVARCHAR(256), TYPE_NAME NVARCHAR(256), \
 PARAMETER_TYPE VARCHAR(7));")

# Insert the SP500 user-defined procedure parameters
cursor.execute("INSERT INTO SP500_PROC_PARAM_TABLE VALUES \
              (1, CURRENT_SCHEMA, 'SP500_PARAMS'  , 'in');")
cursor.execute("INSERT INTO SP500_PROC_PARAM_TABLE VALUES \
              (2, CURRENT_SCHEMA, 'SP500_DATA'    , 'in');")
cursor.execute("INSERT INTO SP500_PROC_PARAM_TABLE VALUES \
              (6, CURRENT_SCHEMA, 'SP500_RESULTS' , 'out');")

在我们创建了特定的参数表并添加了所需的用户定义过程参数后,我们现在可以创建一个 EML 用户定义存储过程来执行实际的预测。这通常通过执行以下代码来完成。

# Drop the user-defined stored procedure if it already exists
cursor.execute("CALL SYS.AFLLANG_WRAPPER_PROCEDURE_DROP(CURRENT_SCHEMA, 'SP500');")
# Create user-defined stored procedure SP500 using the parameters previously specified
cursor.execute("CALL \"SYS\".\"AFLLANG_WRAPPER_PROCEDURE_CREATE\" _
   ('EML', 'PREDICT', CURRENT_SCHEMA, 'SP500', \"SP500_PROC_PARAM_TABLE\");")    

最后,我们的标普 500 指数用户定义 SP500 存储过程已准备好执行。要执行以下过程,我们必须实现以下代码。

cursor.execute("CALL \"SP500\" (\"SP500_PARAMS\", \"SP500_DATA\", \
                \"SP500_RESULTS\") WITH OVERVIEW;")    

以下过程接受三个主要参数,包括包含过程基本执行参数的 SP500_PARAMS 表,包含我们评估数据集的 SP500_DATA 表,以及包含从 TensorFlow 模型服务器返回的预测结果的 SP500_RESULTS 表。成功执行以下过程后,我们必须查询 SP500_RESULTS 表以检索所有预测结果。要做到这一点,我们必须实现以下代码。

cursor.execute("SELECT * FROM \"SP500\".\"SP500_RESULTS\";")

predicted = cursor.fetchall()

results = []
for row in predicted:
    results.append(row["SP500INDEX"]);

在此代码中,我们执行 SELECT * FROM SP500.SP500_RESULTS 查询以获取所有预测结果,并将每个结果行附加到 Python 的通用列表中。之后,我们可以评估和可视化结果。要可视化结果,我们必须执行以下代码。

# Denormalize the results
test_Y = denormalize(test_Y, test_min_vals_Y, test_max_vals_Y)
results = denormalize(results, test_min_vals_Y, test_max_vals_Y)

# Plot the results graph
plt.plot(np.array(test_Y), color='blue')
plt.plot(np.array(results), color='red')

cursor.close()
connection.close()    

在可视化结果之前,我们必须先使用先前实现的 denormalize(...) 函数对结果集进行反规范化。为了绘制可视化结果集预测标普 500 指数值的图形,我们必须使用 Python 的 matplotlib.pyplot 库。可视化预测结果如图所示。

结论

在一系列文章中,我们详细介绍了如何使用 SAP-HANA Express 2.0 通过 SAP-HANA AI 数据分析基础来预测标普 500 指数股票市场的指标值。具体来说,我们讨论了从“零”开始部署 SAP-HANA 虚拟服务器的许多方面,以及如何使用 TensorFlow 构建和导出标普 500 指数预测模型,以及如何使用 TensorFlow 模型服务器 (TMS) 为其提供服务。在最后一篇文章中,我们介绍了使用 SAP-HANA AI 数据分析功能(如 EML 库)在 SQL 引擎后端进行预测的概念。

历史

  • 2019年9月29日 - 发布初始版本
© . All rights reserved.