65.9K
CodeProject 正在变化。 阅读更多。
Home

GCloud - Cloud Functions 和 Bulk Load 到 Datastore(PubSub 触发)

emptyStarIconemptyStarIconemptyStarIconemptyStarIconemptyStarIcon

0/5 (0投票)

2019 年 1 月 21 日

CPOL

2分钟阅读

viewsIcon

5501

GCloud - Cloud Functions 和 Bulk Load 到 Datastore(PubSub 触发)

这篇博文承接了之前的博文,我们在之前的博文中从 BSE 网站下载文件。本文在此基础上进一步添加了一个函数,用于读取这些文件并将其加载到 Datastore 中。

解决方案

在上一篇文章中,除了“加载数据”功能之外,所有组件都已添加。本篇博文添加并执行了该功能。

我们首先执行代码,然后解释代码。

安装

之前的博文中已经完成了所有必要的设置,我们现在需要做的就是下载函数代码并将其上传到 Google Cloud,然后将其链接到 PubSub 主题。

git clone https://github.com/skamalj/gcloud-functions.git
cd gcloud-functions/load-bhavcopy-data
gcloud functions deploy load_bhavcopy_data --trigger-topic bhavcopy-messages --source .  
--runtime python37

清理和执行

如果您遵循了之前的博文,您可能已经在 Datastore 中创建了名为“RecievedFiles”的数据实体。需要从 Datastore 管理控制台删除它。

代码将下载并覆盖云存储中的文件,因此无需从存储桶中删除文件。

现在,像之前的博文一样,通过向云存储存储桶添加一个虚拟文件来触发执行。

您可以在 Datastore 中看到您的数据,如下所示

代码解释

主函数读取事件并将接收到的文件名传递给 lock_and_load 函数。

lock_and_load

  • 该函数使用事务 - 第 5 行 - 以确保多个进程不会上传文件,并将一个状态字段添加到实体中,其值为“Datastore”。
  • 另一个重要一点是使用变量 - load_data。加载数据的过程是 - 从“RecievedFiles”实体读取并锁定文件名,然后将数据加载到“DailyBhavcopy”实体中。这涉及多个实体,而 Datastore 不支持在一个事务中处理多个实体。因此,使用外部变量,可以在事务范围内设置 - 第 11 行 - 用于锁定,然后在退出事务后使用其值 - 第 12 行 -。
def lock_and_load(fname):
    client = datastore.Client(project='bhavcopy')
    load_data = False
    try:
        with client.transaction():
            key = client.key('RecievedFiles', fname)
            row = client.get(key)
            if (row is not None) and ('status' not in row):
                row['status'] = 'Datastore'
                client.put(row)
                load_data = True        
        if load_data:
            store_data(fname)
            logging.info('Data loaded in datastore for ' + fname)
        else:
            logging.info('No database entry for file or it is already loaded: ' + fname) 
    except Exception as e:
        logging.info("Cannot start transaction for loading file "+ fname + ".Recieved error:" + str(e))

Store_data - Datastore 批量插入

  • 此函数解析文件名,然后从存储中读取文件并解压缩它。这段代码没有在代码块中显示。
  • 标题列从文件中的第一行推断出来,并添加了“File_Date”列
  • 在此函数中,我们首先将所有数据加载到内存中
    • 第 1 行以下创建并清空数组
    • 从第 11-17 行,我们创建单行
    • 然后将此行附加到数组 - 第 20 行
    • 因此,当循环退出时,文件中的所有数据都转换为数组中的实体行。
  • 我们这样做是为了避免由于大量单行写入而导致性能和争用问题。
  • 相反,我们使用 put_multi - 第 25-27 行 - 一次插入 400 行,这很有效。最大支持值为 500。
bhavcopy_rows = []

    # First line of each file is header, we collect column names from this line and then for each
    # subsequent row entity is created using file header fields as column names.
    for line in data:
        if not header_read:
            headers = line.split(',')
            header_read = True
        else:
            row_data = line.split(',')
            bhavcopy_row = datastore.Entity(client.key('DailyBhavcopy',row_data[1]+'-'+extracted_date))
            bhavcopy_row['FILE_DATE'] = file_date
            for i in range(12):
                if i in (1,2,3):
                    bhavcopy_row[headers[i]] = str(row_data[i]).strip()
                else:
                    bhavcopy_row[headers[i]] = float(row_data[i].strip())
                    
            # Do not use extend() function, it changes the entity object to string.        
            bhavcopy_rows.append(bhavcopy_row)
        
    logging.info('Collected '+str(len(bhavcopy_rows))+' rows from '+fname)
    
    # Bulk insert has limied to maximum 500 rows, hence this block
    for i in range(0,len(bhavcopy_rows),400):
        client.put_multi(bhavcopy_rows[i:i+400]) 
        logging.info(fname+": "+ "Storing rows from " + str(i) +" to "+ str(i+400))

历史

  • 2019 年 1 月 21 日:初始版本
© . All rights reserved.