在 Apache Beam 中避免 Kotlin 雷区





0/5 (0投票)
如何避免在 Apache Beam 中的 Kotlin 雷区
毫无疑问,Java SDK 是 Apache Beam 支持的语言中最受欢迎且功能最齐全的。如果您将 Java 的现代、开源的表亲 Kotlin 的强大功能引入其中,您将获得出色的开发体验。就像大多数美好的关系一样,并非一切都完美无缺,Beam-Kotlin 组合也不完全例外。
本文将介绍这两种技术之间的一些独特交互,并帮助您避免在入门时可能遇到的潜在“雷区”陷阱,以便您可以专注于 Kotlin 和 Beam 之间的出色体验。
声明匿名 ParDos / DoFns
在网上搜索示例时,经常会看到以下创建匿名 DoFn 以在 ParDo 中使用的代码。
lines.apply("Extract Words", ParDo.of(new DoFn<String, String>() { ... }));
简单地将其转换为 Kotlin 会得到以下结果:
lines.apply("Extract Words", ParDo.of(DoFn<String, String>() { ... }))
但是,您会发现这会导致类型擦除,Beam 会因此抱怨。相反,为了实现匿名函数,您必须明确指示一个对象正在继承自 DoFn:
lines.apply("Extract Words", ParDo.of(object : DoFn<String, String>() { ... }))
定义 TupleTags
如果您处理涉及多种类型的转换或操作,TupleTags 可能非常有价值且必不可少。然而,您可能会发现与这些声明相关的问题会冒出来,导致您必须通过 `setCoder()` 函数在检索特定标签后显式要求定义 Coder。
以下错误是一个明显的迹象:
Exception in thread "
main
"java.lang.IllegalStateException
: Unable to return a default Coder forTransform.out1 [PCollection]
. Correct one of the following root causes: No Coder has been manually specified; you may do so using.setCoder()
. Inferring a Coder from theCoderRegistry
failed: Unable to provide a Coder for V. Building a Coder using a registeredCoderProvider
failed. See suppressed exceptions for detailed failures. Using the default output Coder from the producingPTransform
failed: Unable to provide a Coder for V. Building a Coder using a registeredCoderProvider
failed.
如果您遇到此问题,很可能是您像下面这样定义了您的 TupleTag
:
val userTag = TupleTag<KV<String, User>>()
不幸的是,与 Kotlin 中的大多数问题一样,类型擦除可能是一个问题。为避免此问题,您在定义 TupleTag 时需要尽可能明确,并使用对象实现模式,如下所示:
val usersTag = object: TupleTag<KV<String, User>>() {}
使用对象和尾随的大括号允许在尝试从标签读取时不会丢失特定类型。
IntelliJ 生成的覆盖方法
IntelliJ 最吸引人的功能之一是允许 IDE 为您生成实现或继承自另一个类/接口时缺失的覆盖方法。由于 Kotlin 的类型检查系统,这可能是一个挑战,因为 Kotlin 明确使用 ? 字符来表示可空性,但 Beam 希望您确保类型完全匹配。
考虑以下函数:
class ExampleTransform: PTransform<PCollection<KV<String, Test>>, PCollectionTuple>() {
// Omitted for brevity
}
您知道您需要在此处执行某种操作,因此您利用您的 IDE 并允许它生成适当的覆盖方法。
执行此操作后,您会发现所有类型的可空实例都会被添加,特别是参数:
// Notice the trailing ? after the type definition the input
override fun expand(input: PCollection<KV<String, Test>>?): PCollectionTuple {
TODO("Not yet implemented")
}
Beam 对类型和可空性要求非常严格,因此您需要确保在这种情况下 PCollection
不会带有可空运算符:
override fun expand(input: PCollection<KV<String, Test>>): PCollectionTuple {
TODO("Not yet implemented")
}
Iterables,但具体是哪一个?
Java 和 Kotlin 都提供了 `Iterable` 接口的概念来处理项目集合。但是,当通过分组/批处理操作(如 `GroupIntoBatchs` 转换)利用它们时,Kotlin-Java JVM 之间可能会发生类型断开连接。
pipeline
.apply("Batch Items", GroupIntoBatches.ofSize<Key, Value>(100))
.apply("Apply Batching Transform", ParDo.of(SomeTransform.transform()))
您可能会遇到类似于以下内容的错误:
ProcessContext
argument must have type DoFn<Iterable<? extends Value>, Result<? extends Value>>.ProcessContext.
**您可以通过将 `@JvmWildcard` 注解添加到可迭代对象的类型(而不是 DoFn 上的可迭代对象本身)来解决此问题,通过更改此项:
class SomeTransform: DoFn<KV<Key, Iterable<Value>>, KV<Key, Value>>(){
// Omitted for brevity
}
改为这样。
class SomeTransform: DoFn<KV<Key, Iterable<@JvmWildcard Value>>, KV<Key, Value>>(){
// Omitted for brevity
}
此提示将允许 JVM 确定要使用的接口的正确版本,并由 Beam 编程模型进行序列化/反序列化。
编写 Pipeline 测试
测试,尤其是单元测试,在编写 Beam 应用程序时(当然,也一直如此)非常重要。但是,当您使用 Kotlin 进行测试时,在测试部门有两个主要的“雷区”需要注意,即:
- 定义您的 Pipeline
- 应用 PAsserts
- 运行 Pipeline 测试
定义您的 Pipeline
由于在为 Beam Pipeline 编写单元测试时应用的本机 PAsserts 依赖于本机 Java 代码,因此它们在使用 Beam 时需要一些注解。您可以参考以下示例来构建一个:
@get:Rule
@Transitive
val testPipeline: TestPipeline = TestPipeline.create()
您所有的单个单元测试都可以共享此 Pipeline,但您应该将其写得与上面完全相同,因为 `@get:Rule` 和 `@Transitive` 注解都是必需的,明确的类型声明(例如 `:TestPipeline`)也是必需的。
应用 PAsserts
`PAssert` 库随 Beam 一起提供,它允许您编写专门针对 PCollection 对象的测试(例如,您可以对它们的 PCollection 内容编写断言,验证其内容等)。这些通常会按预期正常工作,但当使用 `.satisfies()` 函数时,会出现一个特别的注意事项。
PAssert.that(numbers).satisfies { elements ->
assertTrue(elements.contains(42))
}
您会发现这不起作用,因为 satisfies() 函数明确期望返回一个 Java Void。由于 Kotlin 中不存在这一点,您需要显式地在函数体末尾放置一个 null。
PAssert.that(numbers).satisfies { elements ->
assertTrue(elements.contains(42))
null // Required
}
运行 Pipeline 测试
在尝试实际执行或运行测试本身时,可能会出现另一个潜在的“雷区”。您必须确保在定义 PAssert 之后,Pipeline 本身被明确地运行到完成:
PAssert.that(numbers).containsInAnyOrder(42)
testPipeline.run().waitUntilFinish()
由于 PAssert 是作为执行 Pipeline 的动态无环图的一部分构建的,因此它必须在运行测试之前声明。如果缺少 `run()` 声明,您还会发现无法调试任何 ParDo 级别的操作。
遗漏了什么“雷区”?
如果您一直在使用 Apache Beam 和 Kotlin,我很想听听您遇到的任何特定“雷区”或用例以及您是如何克服它们的!