65.9K
CodeProject 正在变化。 阅读更多。
Home

Google Cloud Pub/Sub - 设置您的应用程序

2014年12月3日

CC (Attr 3U)

3分钟阅读

viewsIcon

11070

在您可以向 Google Cloud Pub/Sub API 发出请求之前,您的应用程序必须使用 OAuth 2.0 协议设置授权。如果您使用 Google Cloud Pub/Sub 客户端库,您还必须创建 Pubsub 类的实例。

在您可以向 Google Cloud Pub/Sub API 发出请求之前,您的应用程序必须使用 OAuth 2.0 协议设置授权。如果您使用 Google Cloud Pub/Sub 客户端库,您还必须创建 Pubsub 类的实例。

您还应该实现一个系统来处理 RPC 请求失败时的重试尝试。

Authorization

Google APIs 客户端库会为您处理大部分授权过程。OAuth 2.0 的授权过程或“流程”的详细信息,根据您与 Google Cloud Pub/Sub 配合使用的应用程序类型而有所不同。(有关各种类型应用程序的流程的详细信息,请参阅 Google 的 OAuth 2.0 文档。)

由于大多数 Pub/Sub 消息传递操作都离线进行,无需人工干预,因此 Google Cloud Pub/Sub 应用程序最典型的流程是服务器到服务器,使用 服务帐号。您的应用程序使用私钥签署授权请求,以获取随每次 API 调用一起发送的 Web 访问令牌。Google App Engine 和 Google Compute Engine 在后台使用服务帐号,简化了授权过程。

  1. 授权范围
  2. 从 Google App Engine 应用程序授权请求
  3. 从 Google Compute Engine 应用程序授权请求
  4. 从本地或第三方主机授权请求

授权范围

使用 OAuth 2.0 请求访问时,您的应用程序将指定 Google Cloud Pub/Sub 的授权范围信息。您可以使用以下任一选项

范围 含义
https://www.googleapis.com/auth/pubsub 完全访问权限。
https://www.googleapis.com/auth/cloud-platform 完全访问权限。

提示: 如果您使用 Google Cloud Pub/Sub 客户端库,您可以通过 API 以编程方式获取范围。

从 Google App Engine 应用程序授权请求

以下示例演示了如何设置您的客户端并使用 App Engine App Identity API 授权对 Google Cloud Pub/Sub API 的调用。

Java

此示例使用 Google APIs 客户端库 for Java

自定义的 RetryHttpInitializerWrapper 类在 重试处理部分中进行了描述。

import com.google.api.client.extensions.appengine.http.UrlFetchTransport;
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.googleapis.extensions.appengine.auth.oauth2
    .AppIdentityCredential;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.PubsubScopes;

import java.io.IOException;

/**
 * Create a Pubsub client on App Engine.
 */
public class AppEngineConfiguration {

    private static final HttpTransport TRANSPORT =
        UrlFetchTransport.getDefaultInstance();
    private static final JsonFactory JSON_FACTORY =
        JacksonFactory.getDefaultInstance();

    public static Pubsub createPubsubClient() throws IOException {
        GoogleCredential credential = 
            new AppIdentityCredential.AppEngineCredentialWrapper(
                TRANSPORT, JSON_FACTORY).createScoped(PubsubScopes.all());
        // Please use custom HttpRequestInitializer for automatic
        // retry upon failures.  We provide a simple reference
        // implementation in the "Retry Handling" section.
        HttpRequestInitializer initializer =
            new RetryHttpInitializerWrapper(credential);
        return new Pubsub.Builder(TRANSPORT, JSON_FACTORY, initializer)
            .build();
    }
}

Python

此示例使用 Google APIs 客户端库 for Python.

import httplib2

import oauth2client.appengine as gae_oauth2client
from apiclient import discovery
from google.appengine.api import memcache

PUBSUB_SCOPES = ['https://www.googleapis.com/auth/pubsub']

def create_pubsub_client():
    credentials = gae_oauth2client.AppAssertionCredentials(
        scope=PUBSUB_SCOPES)
    http = httplib2.Http(memcache)
    credentials.authorize(http)

    return discovery.build('pubsub', 'v1beta1', http=http)

从 Google Compute Engine 应用程序授权请求

如果您的应用程序在 Google Compute Engine 实例上运行,它将使用从元数据服务器获取的访问令牌进行身份验证。

注意: 确保您已将实例配置为使用服务帐号,并添加了上面列出的 Pub/Sub 范围。有关详细步骤,请参阅 Google Compute Engine 文档中的 准备您的实例以使用服务帐号

以下示例演示了如何设置您的客户端并使用 Compute Credential API 授权对 Google Cloud Pub/Sub API 的调用。

Java

此示例使用 Google APIs 客户端库 for Java

自定义的 RetryHttpInitializerWrapper 类在 重试处理部分中进行了描述。

import com.google.api.client.googleapis.compute.ComputeCredential;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;

import com.google.api.services.pubsub.Pubsub;

import java.io.IOException;
import java.security.GeneralSecurityException;

/**
 * Create a Pubsub client on Compute Engine.
 */
public class ComputeEngineConfiguration {

    private static final JsonFactory JSON_FACTORY =
        JacksonFactory.getDefaultInstance();

    public static Pubsub createPubsubClient()
            throws GeneralSecurityException, IOException {
        HttpTransport transport = GoogleNetHttpTransport.newTrustedTransport();
        ComputeCredential credential =
            new ComputeCredential.Builder(transport, JSON_FACTORY)
                .build();
        // Please use custom HttpRequestInitializer for automatic
        // retry upon failures.  We provide a simple reference
        // implementation in the "Retry Handling" section.
        HttpRequestInitializer initializer =
            new RetryHttpInitializerWrapper(credential);
        return new Pubsub.Builder(transport, JSON_FACTORY, initializer)
            .build();
    }
}

Python

此示例使用 Google APIs 客户端库 for Python.

import httplib2

import oauth2client.gce as gce_oauth2client
from apiclient import discovery

PUBSUB_SCOPES = ['https://www.googleapis.com/auth/pubsub']

def create_pubsub_client():
    credentials = gce_oauth2client.AppAssertionCredentials(
        scope=PUBSUB_SCOPES)
    http = httplib2.Http()
    credentials.authorize(http)
    return discovery.build('pubsub', 'v1beta1', http=http)

从本地或第三方主机授权请求(使用服务帐号)

如果您正在运行本地客户端或在非 Google Cloud 环境中运行,您需要提供您在 设置服务帐号 时获得的凭据。以下示例演示了如何设置您的客户端,并使用您的服务帐号电子邮件地址和私钥来授权对 Google Cloud Pub/Sub API 的调用。

Java

此示例使用 Google APIs 客户端库 for Java

自定义的 RetryHttpInitializerWrapper 类在 重试处理部分中进行了描述。

import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.PubsubScopes;

import java.io.File;
import java.io.IOException;
import java.security.GeneralSecurityException;

/**
 * Create a Pubsub client with the service account.
 */
public class ServiceAccountConfiguration {

    private static final JsonFactory JSON_FACTORY =
        JacksonFactory.getDefaultInstance();

    public static Pubsub createPubsubClient()
            throws IOException, GeneralSecurityException {
        HttpTransport transport = GoogleNetHttpTransport.newTrustedTransport();
        GoogleCredential credential = new GoogleCredential.Builder()
            .setTransport(transport)
            .setJsonFactory(JSON_FACTORY)
            .setServiceAccountScopes(PubsubScopes.all())

            // Obtain this from the "APIs & auth" -> "Credentials"
            // section in the Google Developers Console:
            // https://console.developers.google.com/
            // (and put the e-mail address into your system property obviously)
            .setServiceAccountId(System.getProperty("SERVICE_ACCOUNT_EMAIL"))

            // Download this file from "APIs & auth" -> "Credentials"
            // section in the Google Developers Console:
            // https://console.developers.google.com/
            .setServiceAccountPrivateKeyFromP12File(
                new File(System.getProperty("PRIVATE_KEY_FILE_PATH")))
            .build();
        // Please use custom HttpRequestInitializer for automatic
        // retry upon failures.  We provide a simple reference
        // implementation in the "Retry Handling" section.
        HttpRequestInitializer initializer =
            new RetryHttpInitializerWrapper(credential);
        return new Pubsub.Builder(transport, JSON_FACTORY, initializer).build();
    }
}

Python

此示例使用 Google APIs 客户端库 for Python.

import httplib2

from apiclient import discovery
from oauth2client import client as oauth2client

PUBSUB_SCOPES = ['https://www.googleapis.com/auth/pubsub']

def create_pubsub_client():
    private_key = None
    # Obtain this file from the "APIs & auth" -> "Credentials"
    # section in the Google Developers Console:
    # https://console.developers.google.com/
    with open('MY_PRIVATE_KEY_FILE.p12', 'r') as f:
        private_key = f.read()
    credentials = oauth2client.SignedJwtAssertionCredentials(
        # Obtain this from the "APIs & auth" -> "Credentials"
        # section in the Google Developers Console:
        # https://console.developers.google.com/
        'MY_SERVICE_ACCOUNT_EMAIL',
        private_key,
        PUBSUB_SCOPES)
    http = httplib2.Http()
    credentials.authorize(http)

    return discovery.build('pubsub', 'v1beta1', http=http)

重试处理

您应该实现代码来处理在 RPC 失败的情况下,具有递增退避的重试尝试。

Java

这是一个用于为您处理重试尝试的 Java 类示例。

import com.google.api.client.auth.oauth2.Credential;
import com.google.api.client.http.HttpBackOffIOExceptionHandler;
import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpUnsuccessfulResponseHandler;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.client.util.Sleeper;
import com.google.common.base.Preconditions;

import java.io.IOException;
import java.util.logging.Logger;

/**
 * RetryHttpInitializerWrapper will automatically retry upon RPC
 * failures, preserving the auto-refresh behavior of the Google
 * Credentials.
 */
public class RetryHttpInitializerWrapper implements HttpRequestInitializer {

    private static final Logger LOG =
        Logger.getLogger(RetryHttpInitializerWrapper.class.getName());

    // Intercepts the request for filling in the "Authorization"
    // header field, as well as recovering from certain unsuccessful
    // error codes wherein the Credential must refresh its token for a
    // retry.
    private final Credential wrappedCredential;

    // A sleeper; you can replace it with a mock in your test.
    private final Sleeper sleeper;

    public RetryHttpInitializerWrapper(Credential wrappedCredential) {
        this(wrappedCredential, Sleeper.DEFAULT);
    }

    // Use only for testing.
    RetryHttpInitializerWrapper(
            Credential wrappedCredential, Sleeper sleeper) {
        this.wrappedCredential = Preconditions.checkNotNull(wrappedCredential);
        this.sleeper = sleeper;
    }

    @Override
    public void initialize(HttpRequest request) {
        final HttpUnsuccessfulResponseHandler backoffHandler =
            new HttpBackOffUnsuccessfulResponseHandler(
                new ExponentialBackOff())
                    .setSleeper(sleeper);
        request.setInterceptor(wrappedCredential);
        request.setUnsuccessfulResponseHandler(
                new HttpUnsuccessfulResponseHandler() {
                    @Override
                    public boolean handleResponse(
                            HttpRequest request,
                            HttpResponse response,
                            boolean supportsRetry) throws IOException {
                        if (wrappedCredential.handleResponse(
                                request, response, supportsRetry)) {
                            // If credential decides it can handle it,
                            // the return code or message indicated
                            // something specific to authentication,
                            // and no backoff is desired.
                            return true;
                        } else if (backoffHandler.handleResponse(
                                request, response, supportsRetry)) {
                            // Otherwise, we defer to the judgement of
                            // our internal backoff handler.
                            LOG.info("Retrying "
                                    + request.getUrl().toString());
                            return true;
                        } else {
                            return false;
                        }
                    }
                });
        request.setIOExceptionHandler(
            new HttpBackOffIOExceptionHandler(new ExponentialBackOff())
                .setSleeper(sleeper));
    }
}

Python

您可以将 num_retries=n 参数传递给 API 的 execute() 调用,以便在间歇性失败时使用指数退避进行重试。

  resp = client.subscriptions().pullBatch(body=body).execute(num_retries=3)

除非另有说明,本页的代码示例根据 Apache 2.0 许可证 授权。

© . All rights reserved.