Google Cloud Pub/Sub - 设置您的应用程序





0/5 (0投票)
在您可以向 Google Cloud Pub/Sub API 发出请求之前,您的应用程序必须使用 OAuth 2.0 协议设置授权。如果您使用 Google Cloud Pub/Sub 客户端库,您还必须创建 Pubsub 类的实例。
在您可以向 Google Cloud Pub/Sub API 发出请求之前,您的应用程序必须使用 OAuth 2.0 协议设置授权。如果您使用 Google Cloud Pub/Sub 客户端库,您还必须创建 Pubsub
类的实例。
您还应该实现一个系统来处理 RPC 请求失败时的重试尝试。
Authorization
Google APIs 客户端库会为您处理大部分授权过程。OAuth 2.0 的授权过程或“流程”的详细信息,根据您与 Google Cloud Pub/Sub 配合使用的应用程序类型而有所不同。(有关各种类型应用程序的流程的详细信息,请参阅 Google 的 OAuth 2.0 文档。)
由于大多数 Pub/Sub 消息传递操作都离线进行,无需人工干预,因此 Google Cloud Pub/Sub 应用程序最典型的流程是服务器到服务器,使用 服务帐号。您的应用程序使用私钥签署授权请求,以获取随每次 API 调用一起发送的 Web 访问令牌。Google App Engine 和 Google Compute Engine 在后台使用服务帐号,简化了授权过程。
授权范围
使用 OAuth 2.0 请求访问时,您的应用程序将指定 Google Cloud Pub/Sub 的授权范围信息。您可以使用以下任一选项
范围 | 含义 |
---|---|
https://www.googleapis.com/auth/pubsub |
完全访问权限。 |
https://www.googleapis.com/auth/cloud-platform |
完全访问权限。 |
提示: 如果您使用 Google Cloud Pub/Sub 客户端库,您可以通过 API 以编程方式获取范围。
从 Google App Engine 应用程序授权请求
以下示例演示了如何设置您的客户端并使用 App Engine App Identity API 授权对 Google Cloud Pub/Sub API 的调用。
Java
此示例使用 Google APIs 客户端库 for Java。
自定义的 RetryHttpInitializerWrapper
类在 重试处理部分中进行了描述。
import com.google.api.client.extensions.appengine.http.UrlFetchTransport; import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; import com.google.api.client.googleapis.extensions.appengine.auth.oauth2 .AppIdentityCredential; import com.google.api.client.http.HttpRequestInitializer; import com.google.api.client.http.HttpTransport; import com.google.api.client.json.JsonFactory; import com.google.api.client.json.jackson2.JacksonFactory; import com.google.api.services.pubsub.Pubsub; import com.google.api.services.pubsub.PubsubScopes; import java.io.IOException; /** * Create a Pubsub client on App Engine. */ public class AppEngineConfiguration { private static final HttpTransport TRANSPORT = UrlFetchTransport.getDefaultInstance(); private static final JsonFactory JSON_FACTORY = JacksonFactory.getDefaultInstance(); public static Pubsub createPubsubClient() throws IOException { GoogleCredential credential = new AppIdentityCredential.AppEngineCredentialWrapper( TRANSPORT, JSON_FACTORY).createScoped(PubsubScopes.all()); // Please use custom HttpRequestInitializer for automatic // retry upon failures. We provide a simple reference // implementation in the "Retry Handling" section. HttpRequestInitializer initializer = new RetryHttpInitializerWrapper(credential); return new Pubsub.Builder(TRANSPORT, JSON_FACTORY, initializer) .build(); } }
Python
此示例使用 Google APIs 客户端库 for Python.
import httplib2 import oauth2client.appengine as gae_oauth2client from apiclient import discovery from google.appengine.api import memcache PUBSUB_SCOPES = ['https://www.googleapis.com/auth/pubsub'] def create_pubsub_client(): credentials = gae_oauth2client.AppAssertionCredentials( scope=PUBSUB_SCOPES) http = httplib2.Http(memcache) credentials.authorize(http) return discovery.build('pubsub', 'v1beta1', http=http)
从 Google Compute Engine 应用程序授权请求
如果您的应用程序在 Google Compute Engine 实例上运行,它将使用从元数据服务器获取的访问令牌进行身份验证。
注意: 确保您已将实例配置为使用服务帐号,并添加了上面列出的 Pub/Sub 范围。有关详细步骤,请参阅 Google Compute Engine 文档中的 准备您的实例以使用服务帐号。
以下示例演示了如何设置您的客户端并使用 Compute Credential API 授权对 Google Cloud Pub/Sub API 的调用。
Java
此示例使用 Google APIs 客户端库 for Java。
自定义的 RetryHttpInitializerWrapper
类在 重试处理部分中进行了描述。
import com.google.api.client.googleapis.compute.ComputeCredential; import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; import com.google.api.client.http.HttpRequestInitializer; import com.google.api.client.http.HttpTransport; import com.google.api.client.json.JsonFactory; import com.google.api.client.json.jackson2.JacksonFactory; import com.google.api.services.pubsub.Pubsub; import java.io.IOException; import java.security.GeneralSecurityException; /** * Create a Pubsub client on Compute Engine. */ public class ComputeEngineConfiguration { private static final JsonFactory JSON_FACTORY = JacksonFactory.getDefaultInstance(); public static Pubsub createPubsubClient() throws GeneralSecurityException, IOException { HttpTransport transport = GoogleNetHttpTransport.newTrustedTransport(); ComputeCredential credential = new ComputeCredential.Builder(transport, JSON_FACTORY) .build(); // Please use custom HttpRequestInitializer for automatic // retry upon failures. We provide a simple reference // implementation in the "Retry Handling" section. HttpRequestInitializer initializer = new RetryHttpInitializerWrapper(credential); return new Pubsub.Builder(transport, JSON_FACTORY, initializer) .build(); } }
Python
此示例使用 Google APIs 客户端库 for Python.
import httplib2 import oauth2client.gce as gce_oauth2client from apiclient import discovery PUBSUB_SCOPES = ['https://www.googleapis.com/auth/pubsub'] def create_pubsub_client(): credentials = gce_oauth2client.AppAssertionCredentials( scope=PUBSUB_SCOPES) http = httplib2.Http() credentials.authorize(http) return discovery.build('pubsub', 'v1beta1', http=http)
从本地或第三方主机授权请求(使用服务帐号)
如果您正在运行本地客户端或在非 Google Cloud 环境中运行,您需要提供您在 设置服务帐号 时获得的凭据。以下示例演示了如何设置您的客户端,并使用您的服务帐号电子邮件地址和私钥来授权对 Google Cloud Pub/Sub API 的调用。
Java
此示例使用 Google APIs 客户端库 for Java。
自定义的 RetryHttpInitializerWrapper
类在 重试处理部分中进行了描述。
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; import com.google.api.client.http.HttpRequestInitializer; import com.google.api.client.http.HttpTransport; import com.google.api.client.json.JsonFactory; import com.google.api.client.json.jackson2.JacksonFactory; import com.google.api.services.pubsub.Pubsub; import com.google.api.services.pubsub.PubsubScopes; import java.io.File; import java.io.IOException; import java.security.GeneralSecurityException; /** * Create a Pubsub client with the service account. */ public class ServiceAccountConfiguration { private static final JsonFactory JSON_FACTORY = JacksonFactory.getDefaultInstance(); public static Pubsub createPubsubClient() throws IOException, GeneralSecurityException { HttpTransport transport = GoogleNetHttpTransport.newTrustedTransport(); GoogleCredential credential = new GoogleCredential.Builder() .setTransport(transport) .setJsonFactory(JSON_FACTORY) .setServiceAccountScopes(PubsubScopes.all()) // Obtain this from the "APIs & auth" -> "Credentials" // section in the Google Developers Console: // https://console.developers.google.com/ // (and put the e-mail address into your system property obviously) .setServiceAccountId(System.getProperty("SERVICE_ACCOUNT_EMAIL")) // Download this file from "APIs & auth" -> "Credentials" // section in the Google Developers Console: // https://console.developers.google.com/ .setServiceAccountPrivateKeyFromP12File( new File(System.getProperty("PRIVATE_KEY_FILE_PATH"))) .build(); // Please use custom HttpRequestInitializer for automatic // retry upon failures. We provide a simple reference // implementation in the "Retry Handling" section. HttpRequestInitializer initializer = new RetryHttpInitializerWrapper(credential); return new Pubsub.Builder(transport, JSON_FACTORY, initializer).build(); } }
Python
此示例使用 Google APIs 客户端库 for Python.
import httplib2 from apiclient import discovery from oauth2client import client as oauth2client PUBSUB_SCOPES = ['https://www.googleapis.com/auth/pubsub'] def create_pubsub_client(): private_key = None # Obtain this file from the "APIs & auth" -> "Credentials" # section in the Google Developers Console: # https://console.developers.google.com/ with open('MY_PRIVATE_KEY_FILE.p12', 'r') as f: private_key = f.read() credentials = oauth2client.SignedJwtAssertionCredentials( # Obtain this from the "APIs & auth" -> "Credentials" # section in the Google Developers Console: # https://console.developers.google.com/ 'MY_SERVICE_ACCOUNT_EMAIL', private_key, PUBSUB_SCOPES) http = httplib2.Http() credentials.authorize(http) return discovery.build('pubsub', 'v1beta1', http=http)
重试处理
您应该实现代码来处理在 RPC 失败的情况下,具有递增退避的重试尝试。
Java
这是一个用于为您处理重试尝试的 Java 类示例。
import com.google.api.client.auth.oauth2.Credential; import com.google.api.client.http.HttpBackOffIOExceptionHandler; import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler; import com.google.api.client.http.HttpRequest; import com.google.api.client.http.HttpRequestInitializer; import com.google.api.client.http.HttpResponse; import com.google.api.client.http.HttpUnsuccessfulResponseHandler; import com.google.api.client.util.ExponentialBackOff; import com.google.api.client.util.Sleeper; import com.google.common.base.Preconditions; import java.io.IOException; import java.util.logging.Logger; /** * RetryHttpInitializerWrapper will automatically retry upon RPC * failures, preserving the auto-refresh behavior of the Google * Credentials. */ public class RetryHttpInitializerWrapper implements HttpRequestInitializer { private static final Logger LOG = Logger.getLogger(RetryHttpInitializerWrapper.class.getName()); // Intercepts the request for filling in the "Authorization" // header field, as well as recovering from certain unsuccessful // error codes wherein the Credential must refresh its token for a // retry. private final Credential wrappedCredential; // A sleeper; you can replace it with a mock in your test. private final Sleeper sleeper; public RetryHttpInitializerWrapper(Credential wrappedCredential) { this(wrappedCredential, Sleeper.DEFAULT); } // Use only for testing. RetryHttpInitializerWrapper( Credential wrappedCredential, Sleeper sleeper) { this.wrappedCredential = Preconditions.checkNotNull(wrappedCredential); this.sleeper = sleeper; } @Override public void initialize(HttpRequest request) { final HttpUnsuccessfulResponseHandler backoffHandler = new HttpBackOffUnsuccessfulResponseHandler( new ExponentialBackOff()) .setSleeper(sleeper); request.setInterceptor(wrappedCredential); request.setUnsuccessfulResponseHandler( new HttpUnsuccessfulResponseHandler() { @Override public boolean handleResponse( HttpRequest request, HttpResponse response, boolean supportsRetry) throws IOException { if (wrappedCredential.handleResponse( request, response, supportsRetry)) { // If credential decides it can handle it, // the return code or message indicated // something specific to authentication, // and no backoff is desired. return true; } else if (backoffHandler.handleResponse( request, response, supportsRetry)) { // Otherwise, we defer to the judgement of // our internal backoff handler. LOG.info("Retrying " + request.getUrl().toString()); return true; } else { return false; } } }); request.setIOExceptionHandler( new HttpBackOffIOExceptionHandler(new ExponentialBackOff()) .setSleeper(sleeper)); } }
Python
您可以将 num_retries=n
参数传递给 API 的 execute()
调用,以便在间歇性失败时使用指数退避进行重试。
resp = client.subscriptions().pullBatch(body=body).execute(num_retries=3)
除非另有说明,本页的代码示例根据 Apache 2.0 许可证 授权。