65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 Java 的 Apache HBase 示例

starIconstarIconstarIconstarIconstarIcon

5.00/5 (3投票s)

2015年9月3日

CPOL

9分钟阅读

viewsIcon

50110

downloadIcon

865

简单的 Java 程序演示了 HBase 表的创建、数据导入和查询

引言

本项目演示了如何使用 Apache HBase ( http://hbase.apache.org/) 和 Java API。它旨在为探索 HBase 的功能提供一个起点,并为 HBase 新手提供入门概述。该项目的更新代码可在 GitHub (https://github.com/ggraham-412/HBaseJavaExample,提交 b1fdec) 以及随附的 zip 文件中找到。

背景

Apache HBase 是一种面向列的键值 NoSQL 数据库,以 Google 的 BigTable (http://research.google.com/archive/bigtable.html) 为模型。HBase 设计用于与 Hadoop 分布式文件存储 (HDFS) 配合使用,并且从一开始就设计用于在商用硬件集群上实现可伸缩性。与其他 NoSQL 数据库项目一样,HBase 通过放弃传统 RDBMS 的一些功能(如事务完整性、引用完整性和 ACID (https://en.wikipedia.org/wiki/ACID) 保证)来兑现其可伸缩性承诺。HBase 保留了其中一些保证,但仅在某些条件下。

HBase 实现了一个水平分区的键值映射。HBase 中的每个项都可以通过行键、列族和族内的列名进行寻址。此外,每个项都通过时间戳进行版本控制。HBase 将存储最多 N 个版本的数据,其中 N 可在列族上设置。查询 HBase 时,如果未指定版本,则返回最新数据。

{row key, column family:column, version} -> {data item}

行键、列族和列均表示为字节数组。虽然这三者通常都使用 string,但只有列族有使用可打印字符的限制。版本必须是长整型。

记录按行键进行词典顺序聚类。它是唯一可排序的键,因此通常将其制成混合复合键。在行键的设计中需要小心,并且选择的混合方式必须反映预期的查询性质。在完全分布式的 HBase 系统中,数据将根据行键空间区域驻留在“区域服务器”上。

Using the Code

安装和部署

本项目包含从 Java 访问 HBase 的示例代码。示例代码会将 Google Finance 的每日股票价格数据导入 HBase,并对其运行简单查询。该示例是使用 HBase 1.0.1.1 或兼容版本、Java 8 JDK 更新 60 和 Fedora 22 Linux (4.1.6-200.fc22.x86_64) 开发的。它也应该可以在 Windows 上使用 Cygwin (http://hbase.apache.org/cygwin.html) 运行,但我未测试过。

解压 HBase 归档文件,并在需要时编辑配置文件。HBase 默认应在 /tmp 文件夹上运行,而不使用 HDFS。要更改 HBase 用于其存储的文件夹,请按如下方式编辑配置文件 conf/hbase-site.xml

<configuration>
  <property>
    <name>hbase.rootdir</name>
    <value>file:///data/hbase</value>
  </property>
  <property>
    <name>hbase.zookeeper.property.dataDir</name>
    <value>/data/zookeeper</value>
  </property>
</configuration>

上述配置将导致 HBase 在本地主机上使用 /data 文件夹。请注意,无需创建 /data/hbase/data/zookeeper 文件夹;HBase 会为您完成。但是,/data 文件夹应可供运行 HBase 守护进程的任何用户写入。

要启动 HBase,请发出命令

bin/start-hbase.sh

此存档中包含的示例代码是在 Java 8 更新 60 版本(来自 http://www.java.com)下编译的。

编译

Java 代码必须针对 HBase 附带的相当多的 jar 文件进行编译。我不知道是否所有 jar 文件都真正需要,但包含所有 jar 文件即可正常工作。有一个名为 makeCPATH 的小型 shell 脚本可以帮助完成此操作。脚本必须将 lib 文件夹的位置作为第一个参数进行源化。

. ./makeCPATH.sh /path/to/hbase/lib
echo $CPATH

之后,变量 CPATH 应包含 HBase lib 文件夹中所有 jar 文件的列表。在 Windows 上,您需要编写一个等效的 .bat 文件来完成相同的操作。或者,您可以将此代码导入 Eclipse 等 IDE,并通过对话框界面设置项目构建路径以包含所有 jar 文件。

要编译 Java 代码,请切换到包含此示例 Java 源代码的文件夹,例如 TestHBase.java。执行命令

javac -cp $CPATH *.java
运行示例

HBase 的配置文件夹位置应在环境变量 HBASE_CONF_DIR 中设置。这使得 Java 代码能够找到并读取 HBase 配置。(hbase-site.xml 文件应在此文件夹中。)

此外,Java 环境变量 JAVA_HOME 应设置为包含您的 Java 安装的“bin/java”部分路径的文件夹。(**注意**:请确保这是安装文件夹,而不是包含符号链接的文件夹。例如,它应该看起来像“/usr/java/jdk1.8.0_60/jre”。)

示例代码附带了四个从 Google Finance 获取的股票价格数据集,通过 http://www.quandl.com 获取,代码为 ABT、BMY、MRK 和 PFE。这些数据集包含在 FinData 文件夹中。TestHBase 类定义在包外,因此您只需通过

java -cp $CPATH:. TestHBase

代码将连接到 conf/hbase-site.xml 配置文件中定义的 HBase 实例。然后,它将删除表(如果它已从先前的运行中存在),(重新)创建表,将四个示例股票数据集加载到表中,并运行一些示例查询。

表名为 BarData。它将包含每日股票价格变动的“蜡烛图”数据:开盘价、最高价、最低价、收盘价以及每日成交量。此表可以离线使用 hbase shell 进行检查。(有关 HBase shell 的更多信息,请参阅 https://learnhbase.wordpress.com/2013/03/02/hbase-shell-commands/。)

关注点

设计的核心是一个受 DAO 启发的类,名为 BarDatabase。模式在类中使用字节数组常量指定,以避免重复将表、行和列 string 名称转换为字节数组以供 Java HBase API 使用的不必要开销。该类避免使用中间数据对象,而是将责任委托给专门的接口,用于从数据源读取数据以及处理从查询返回的数据。这里可以做更多工作来使用原始流并避免基于行的文本行。

创建/删除表
        try (Connection connection = ConnectionFactory.createConnection(config);
             Admin admin = connection.getAdmin()) {

            HTableDescriptor table =
                new HTableDescriptor(TableName.valueOf(TABLE_NAME));
            table.addFamily(new HColumnDescriptor(COLUMN_FAMILY));

            if (!admin.tableExists(table.getTableName())) {
                System.out.print("Creating table. ");
                admin.createTable(table);
                System.out.println(" Done.");
            }
        }

HBase 中的每个操作都在连接的上下文中进行。我们使用 AutoClose 功能来保证连接在 try 块结束时关闭。然后使用 HTableDescriptor 和 HBase Admin 接口创建(或删除)表。创建新表或删除现有表之间的唯一重要区别是,必须先禁用现有表才能删除它。

           if (admin.tableExists(table.getTableName())) {
                System.out.print("Dropping table. ");
                // a table must be disabled before it can be dropped
                admin.disableTable(table.getTableName());
                admin.deleteTable(table.getTableName());
                System.out.println(" Done.");
            }       

列族是它们逻辑上包含的列的管理范围。要为列保留的版本数限制是定义在列族级别的管理参数的一个示例。

导入数据

可以使用 Put 对象导入数据。这可以逐行进行,也可以一次导入 Put 对象列表。(我不确定这是否是真正的批量操作,但在本地系统上以这种方式加载数据比在读取每个行记录后导入每个 Put 快大约两倍。)

在示例代码中,由于数据驻留在 CSV 文件中,因此通过一个名为 LineImporter 的类来实现,该类是 BarDatabase 的内部类,实现了回调接口。在由文本文件 LineReader 实例读取每一行后,LineImporter 会创建一个 Put 对象并将其保存在一个名为 currentImport 的列表中。当文件流关闭时,LineImporter 实例将数据批量加载到 HBase 中。

       /**
         *    Imports bulk data into HBase table
         */
        @Override
        public void close() throws Exception {
            if ( currentImport.isEmpty() ) return;
            try (Connection conn = ConnectionFactory.createConnection(config)) {
                Table table = conn.getTable(TableName.valueOf(TABLE_NAME));
                table.put(currentImport);
                table.close();
            }
            finally {
                currentImport.clear();
            }
        } 
查询数据

通常有两种查询方法:GetScanGet 用于查找单行(或单行的单个单元格),而 Scan 用于返回行集。Get 由行键、可选的列族、族内的可选列和可选的版本号参数化。以下是仅检索给定股票收盘价的查询示例。

    /**
     *    Gets a single cell given the date and stock symbol and column ID
     */
    public String GetCell(String date, String symbol, byte[] column)
            throws IOException {
        try (Connection conn = ConnectionFactory.createConnection(config)){
            // Get the table
            Table table = conn.getTable(TableName.valueOf(TABLE_NAME));
            // Construct a "getter" with the rowkey.
            Get get = new Get(makeKey(date, symbol));
            // Further refine the "get" with a column specification
            get.addColumn(COLUMN_FAMILY, column);
            // Get the result by passing the getter to the table
            Result r = table.get(get);
            // return the results
            if ( r.isEmpty() ) return null;
            // Gets the value of the first (and only) column
            return new String(r.value());
        }
    } 

在这种情况下,同时指定了列族和族内的列。如果我们只使用 addFamily 调用指定了族,那么该族中的所有列都将被返回。

对于扫描示例,请考虑以下代码。您不指定特定的行键,而是指定一个起始行键和一个限制。在查询能力方面,这似乎有些有限(无意双关),但您也可以通过 setFilter() 方法指定在服务器端执行的行过滤器。

但是,Scan 仍然名副其实:它会命中扫描中的每一行键。行限制纯粹在客户端代码中实现。(在官方 HBase shell 的交互式扫描版本中也实现了客户端限制。)但是,您可以设置服务器端的缓存限制,并使用 PageFilter 来防止服务器在您只需要几行时处理每一行。

    /**
     *    Specifies a range of rows to retrieve based on a starting row key
     *    and retrieves up to limit rows.  Each row is passed to the supplied
     *    DataScanner.
     */
    public void ScanRows(String startDate, String symbol,
             int limit, DataScanner scanner) throws IOException {
        ResultScanner results = null;
        try (Connection conn = ConnectionFactory.createConnection(config)){
            // Get the table
            Table table = conn.getTable(TableName.valueOf(TABLE_NAME));
            // Create the scan
            Scan scan = new Scan();
            // start at a specific rowkey.
            scan.setStartRow(makeKey(startDate, symbol));
            // Tell the server not to cache more than limit rows
            // since we won;t need them
            scan.setCaching(limit);
            // Can also set a server side filter
            scan.setFilter(new PageFilter(limit));
            // Get the scan results
            results = table.getScanner(scan);
            // Iterate over the scan results and break at the limit
            int count = 0;
            for ( Result r : results ) {
                scanner.ProcessRow(r);
                if ( count++ >= limit ) break;
            }
        }
        finally {
            // ResultScanner must be closed.
            if ( results != null ) results.close();         
        }
    }

在上面的代码中,我们还看到查询结果不是就地处理,而是直接发送到回调接口。同样,这是为了避免创建中间对象。在示例中,提供的 DataScanner 只是将输出转储到标准输出。

行键

我们上面提到了设计良好行键的重要性。在此示例中,行键选择为股票名称后跟日期的混合。在完全分布式的系统中,这意味着行将首先根据其股票代码分配给服务器,然后根据其日期进行分配。这有重要的好处。针对历史数据的查询很可能主要集中在昨天的数据上,然后是前天等。分析师很少会查看,例如,1987 年 IBM 的股票价格。因此,如果行键的设计方式是日期在前,那么日期就会倾向于聚集在一起,而实际的查询流量会倾向于轰炸您集群中一小部分 HBase 区域服务器!

结论

我没有谈论 Zookeeper。Zookeeper 是一种分布式任务管理器,它同步配置并协调分布式服务。由于这是一个打算在单节点上运行的简单示例,因此我没有讨论它。

NoSQL 数据库为托管大型数据存储(高达数十亿行和数百万列)提供了改进的可伸缩性。这些数据库系统通过放宽关系模型施加的各种约束来兑现这一承诺,而对约束的放宽选择则由特定的用例驱动。我认为 NoSQL 数据库的繁多归因于许多不同的用例以及放宽关系模型的许多不同方式和程度。(http://nosql-database.org/)。

© . All rights reserved.