为了使用Beam,首先必须使用Beam SDKs其中一个SDK里面的类创建一个驱动程序。驱动程序定义了管道,包括所有的输入,转换以及输出。它还为您的管道设置了执行选项(通常使用命令行选项传递)。这些包括管道运行器,又决定了管道运行的后端。
Beam SDK提供了许多简化大规模分布式数据处理的机制的抽象。相同的Beam抽象在批处理和流数据源中都可以使用。当创建Beam管道时,您可以根据这些抽象思考数据处理任务。他们包括:
l 管道Pipeline: Pipeline
从头到尾封装整个数据处理任务。包括读取输入数据,转换数据和写入输出数据。所有Beam驱动程序必须创建一个Pipeline
。创建Pipeline
时,还必须指定执行选项,告诉Pipeline
在哪里以及如何运行。
l PCollection: PCollection
表示Beam管道运行操作的分布式数据集。数据集可以是有界的,这意味着它来自像文件这样的固定的源,或者是×××的,这意味着它来自订阅或其他机制持续不断更新的源。您的管道通常通过从外部数据源读取数据创建PCollection
初始值,但也可以从驱动程序中的内存中的数据来创建PCollection
。因此,PCollections
是管道中每个步骤的输入和输出。
l 转换Transform: Transform
代表管道中的数据处理操作或步骤。每个Transform
都需要一个或多个PCollection
对象作为输入,在PCollection
对象的元素上执行您提供的处理函数,并生成一个或多个输出PCollection
对象。
l I/O Source 和Sink: Beam提供Source
和Sink
APIs分别表示读取和写入数据。Source
封装从一些外部来源(如云端文件存储或订阅流式数据源)将数据读取到Beam管道所需的代码。Sink
同样封装将PCollection
的元素写入外部数据源所需的代码。
一个典型的Beam驱动程序的工作原理如下:
l 创建一个Pipeline对象并设置管道执行选项,包括管道运行器。
l 为管道数据创建PCollection的初始值,使用Source API从外部源读取数据,或使用Create转换从内存中的数据构建PCollection。
l 对每个PCollection应用转换。转换可以改变,过滤,分组,分析或以其他方式处理PCollection中的元素。转换会创建一个新输出的PCollection,而不消费输入的集合。典型的管道依次将后续转换应用于每个新输出的PCollection,直到处理完成。
l 输出最终的转换的PCollection,通常使用Sink API将数据写入外部源。
l 使用指定的管道运行器运行管道。
当运行Beam驱动程序时,指定的管道运行器将根据创建的管道的工作流图,基于您创建的PCollection对象已经应用的转换。然后使用适当的分布式处理后端执行该图形,成为后端异步的“作业”(或等效的)。
Pipeline
抽象封装了数据处理任务的所有数据和步骤。Beam驱动程序通常从构建一个Pipeline对象开始,然后使用该对象作为创建管道数据集PCollection
和以及Transform
s操作基础。
要使用Beam,驱动程序必须首先创建Beam SDK Pipeline
类的实例(通常在main()
函数中)。创建Pipeline
时,还需要设置一些配置选项。可以以编程方式设置管道的配置选项,但提前设置选项(或从命令行读取)通常更容易,并在创建对象时将其传递给Pipeline
对象。
// Start by defining the options for the pipeline.
PipelineOptions options = PipelineOptionsFactory.create();
// Then create the pipeline.
Pipeline p = Pipeline.create(options);
使用管道选项来配置管道的不同方面,例如将要执行管道的管道运行器以及所选运行器所需的任何特定配置。管道选项可能包含诸如项目ID或存储文件位置等信息。
当您在所选择的运行程序上运行管道时,PipelineOptions的副本将可用于您的代码。例如,可以从DoFn的上下文中读取PipelineOptions。
PipelineOptions options = context.getPipelineOptions();
命令行参数设置PipelineOptions
可以通过创建PipelineOptions
对象并直接设置字段来配置管道,Beam SDK包含一个命令行解析器,可以使用此解析器解析命令行参数后设置PipelineOptions
字段。
要从命令行读取选项,构建PipelineOptions
对象,如以下示例代码所示:
MyOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation().create();
将解释遵循以下格式的命令行参数:
--<option>=<value>
注意:附加方法.withValidation
将检查所需的命令行参数并验证参数值。
以这种方式构建PipelineOptions
,可以将任何选项指定为命令行参数。
注意:该WordCount示例管道指明了在运行时如何使用命令行选项来设置管道选项。
除了标准PipelineOptions
之外,还可以添加自己的自定义选项。要添加自定义选项,请为每个选项定义一个带有getter和setter方法的接口,如以下示例所示:
public interface MyOptions extends PipelineOptions {
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
还可以指定描述和默认值,当用户使用--help
作为命令行参数传递时会显示它们。
使用注释设置描述和默认值,如下所示:
public interface MyOptions extends PipelineOptions {
@Description("My custom command line argument.")
@Default.String("DEFAULT")
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
建议在创建PipelineOptions对象时将自定义的接口注册到PipelineOptionsFactory。当把自定义的接口注册到PipelineOptionsFactory之后,--help可以找到自定义的选项接口,并将其添加到--help命令的输出。PipelineOptionsFactory还将验证自定义选项与所有其他注册的选项是否兼容。
以下示例代码显示如何注册自定义选项接口到PipelineOptionsFactory:
PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(MyOptions.class);
现在管道可以接受--myCustomOption=value
作为一个命令行参数。
所述PCollection抽象表示潜在分布式,多元素的数据集。可以把PCollection
认为是“管道”的数据; Beam转换使用PCollection
对象作为输入和输出。因此,如果要处理管道中的数据,则必须采用PCollection
的形式。
创建Pipeline
完毕后,需要先创建至少一个某种形式的PCollection
。创建的PCollection
作为管道中第一个操作的输入。
创建PCollection
对象实例,可以通过使用Beam的Source API从外部数据源读取数据,也可以在驱动程序中存储在内存集合类中的数据。前者通常是生产环境下管道获取数据的方式; Beam的Source API包含多种适配器,可以从外部来源(如大型的基于云的文件,数据库或订阅服务)中读取。后者主要用于测试和调试的目的。
要从外部源读取,需要使用Beam提供的I/O适配器之一。适配器的具体用途不同,但它们都读取某些外部数据源,并返回PCollection
,它的元素表示该源中的数据记录。
每个数据源适配器都有一个Read
转换; 如果要读取,必须将该转换应用于Pipeline
对象本身。例如,读取外部文本文件并返回其元素类型为String的PCollection
,每个String表示文本文件中的一行。如下是如何将TextIO.Read
应用到Pipeline
以便创建一个PCollection
:
public static void main(String[] args) {
// Create the pipeline.
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).create();
Pipeline p = Pipeline.create(options);
// Create the PCollection 'lines' by applying a 'Read' transform.
PCollection<String> lines = p.apply(
"ReadMyFile", TextIO.read().from("protocol://path/to/some/inputData.txt"));
}
请参阅I/O部分,了解有关如何阅读Beam SDK支持各种数据源的更多信息。
要从内存中的Java Collection
创建PCollection
,可以使用Beam提供的Create
转换。很像数据适配器Read
,可以将Create
直接应用于Pipeline
对象本身。
Create
接受Java Collection
和一个Coder
对象作为参数,在Coder
指定的Collection
中的元素如何编码。
以下示例代码显示了如何从内存中的List
创建PCollection
:
public static void main(String[] args) {
// Create a Java Collection, in this case a List of Strings.
static final List<String> LINES = Arrays.asList(
"To be, or not to be: that is the question: ",
"Whether 'tis nobler in the mind to suffer ",
"The slings and arrows of outrageous fortune, ",
"Or to take arms against a sea of troubles, ");
// Create the pipeline.
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).create();
Pipeline p = Pipeline.create(options);
// Apply Create, passing the list and the coder, to create the PCollection.
p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of())
}
PCollection
由创建它的特定Pipeline
对象拥有; 多个管道不能共享同一个PCollection
。在某些方面,PCollection
的功能就像一个集合类。然而,PCollection
在几个关键方面有所不同:
PCollection
的元素可以是任何类型的,但是必须都是相同的类型。然而,为了支持分布式处理,Beam需要能够将每个单独的元素编码为字节串(byte string)(因此元素可以传递给分布式Worker)。Beam SDK提供了一种数据编码机制,包括常用类型的内置编码,以及根据需要指定的自定义编码支持。
PCollection
是不可变的,一旦创建后,无法添加,删除或更改单个元素。Beam转换可以处理PCollection
中每个元素并生成新的管道数据(作为新的PCollection
),但不消费或修改原始的输入集合。
PCollection
不支持随机访问单个元素。相反,Beam 转换可以单独考虑PCollection
的每个元素。
PCollection
是一个大的,不可变的元素的“包”。一个PCollection
可以包含的元素数量没有上限; 任何给定的PCollection
也许适合单个机器上的内存,或者它可以表示非常大的持久存储的分布式数据集。
PCollection
可以是有界的也可以是×××的。有界的 PCollection
代表已知的固定大小的数据集,而×××的PCollection
代表无限大小的数据集。PCollection
是有界还是×××取决于它代表的源数据集。从批量数据源(如文件或数据库)读取可创建有界的PCollection
。从流或持续更新的数据源(如Pub/Sub或Kafka)读取会创建一个×××的PCollection
(除非明确告诉它不要)。
PCollection
有界(或×××)的性质影响Beam如何处理它的数据。可以使用批处理作业处理有界的PCollection
,可以一次读取整个数据集,并在有限长度的作业中执行处理。必须使用连续运行的流式作业来处理×××的PCollection
,因为整个集合不会一次都可用于处理。
当对×××PCollection
的元素进行分组操作时,Beam需要一个称为窗口的概念,将不断更新的数据集划分为有限大小的逻辑窗口。Beam将每个窗口处理作为一个bundle,随着数据集的生成,处理将持续进行。这些逻辑窗口由与数据元素相关联的一些特性(诸如时间戳)来确定。
PCollection
中的每个元素都具有与其相关联的内在时间戳。每个元素的时间戳记最初由创建的PCollection
源分配。创建×××PCollection
的源通常会为每个新元素分配元素被读取或添加时相对应的时间戳。
注意:PCollection
为固定数据集创建有界限的源也会自动分配时间戳,但最常见的行为是为每个元素分配相同的时间戳(Long.MIN_VALUE
)。
时间戳对于包含固有时间概念元素的PCollection
很有用。如果管道正在读取一系列事件,如推文或其他社交媒体消息,则每个元素可能会将事件发布的时间用作元素时间戳。
可以手动将时间戳分配给PCollection
的某个元素,如果源不为元素分配时间戳。如果元素具有固有的时间戳,但是时间戳在元素本身的结构中(例如服务器日志条目中的“时间”字段),则需要执行此操作。Beam有转换操作,其把PCollection
作为输入且输出与附加的时间戳完全相同的PCollection
; 有关如何执行此操作的详细信息,请参阅分配时间戳。
在Beam的SDK中,转换是管道中的操作。转换采用PCollection
(或多个PCollection
)作为输入,在该集合中的每个元素上执行指定的操作,并生成新的输出PCollection
。要调用转换,必须将其应用于输入PCollection
。
Beam SDK包含许多不同的转换,可以将其应用于管道的PCollection
。这些转换包括通用的核心转换,如ParDo或Combine。还包括SDK中包含的预编写的复合变换,它们将一个或多个核心转换组合在一个有用的处理模式中,例如计数或组合集合中的元素。还可以定义自己的更复杂的复合转换,以适应管道确切的用例。
Beam SDK中的每个转换都有通用的apply
方法。调用多个Beam转换类似于方法链,但有一点不同之处:将转换应用于输入PCollection
,将转换本身作为参数传递,操作返回输出PCollection
。一般形式如下:
[Output PCollection] = [Input PCollection].apply([Transform])
由于Beam对PCollection
使用通用的apply
方法,因此可以依次链接转换,也可以应用包含嵌套的其他转换的转换(在Beam SDK中称为复合转换)。
如何应用管道的转换决定了管道的结构。最好方法就是把管道作为一个有向无环图,其中节点是PCollection
s,边是转换。例如,可以以链式转换的方式来创建顺序管道,如下所示:
[Final Output PCollection] = [Initial Input PCollection].apply([First Transform])
.apply([Second Transform])
.apply([Third Transform])
上述管道的生成工作流程图如下所示:[顺序图图形] [Sequential Graph Graphic]
但是,请注意,转换不消费或以其他方式更改输入集合 - 记住,PCollection
根据定义是不可变的。这意味着您可以将多个转换应用于同一个输入PCollection
以创建分支管道,如下所示:
[Output PCollection 1] = [Input PCollection].apply([Transform 1])
[Output PCollection 2] = [Input PCollection].apply([Transform 2])
上述管道的生成工作流程图如下所示:[分支图图形] [Branching Graph Graphic]
还可以构建自己的复合变换,即将多个子步骤嵌套在单个更大的变换中。复合变换对于构建可在许多不同地方使用的可重用的简单步骤序列特别有用。
Beam SDK中的转换提供了通用的处理框架,可以以函数对象(俗称“用户代码”)的形式提供处理逻辑。用户代码应用于输入的PCollection
的元素。用户代码的实例可能会由集中群的许多不同的Worker并行执行,具体取决于选择执行Beam管道的管道运行器和后端。在每个Worker上运行的用户代码生成的输出元素最终添加到转换产生的最终输出PCollection
中。
Beam提供以下转换,每个转换代表不同的处理范例:
l ParDo
l GroupByKey
l Combine
l Flatten 和 Partition
ParDo
是用于通用并行处理的Beam转换。ParDo
的处理范例类似于map/shuffle/reduce形式的算法中的“Map”操作:一个ParDo
转换考虑到了输入PCollection
中的每个元素,在该元素上执行一些处理函数(用户代码),并发送0个,1个或多个元素到输出PCollection
。
ParDo
可用于各种常见的数据处理操作,包括:
l 过滤数据集。可以使用ParDo
来过虑PCollection
中的每个元素,并将该元素输出到新集合,或者将其丢弃。
l 格式化或转换数据集中的每个元素。如果输入PCollection
包含与想要的不同类型或格式的元素,则可以使用ParDo
对每个元素执行转换并将结果输出到新的PCollection
。
l 提取数据集中每个元素的部分数据。如果PCollection
中的记录带有多个字段,例如,可以使用 ParDo
解析出将想要的字段输出到新的PCollection
。
l 对数据集中的每个元素执行计算。可以使用ParDo
对PCollection
中的每个元素或某些确定元素进行简单或复杂的计算,并将结果输出为新的PCollection
。
在这样的角色中,ParDo
是管道中一个通用的中间步骤。可以使用它从一组原始输入记录中提取某些字段,或将原始输入转换为不同的格式; 也可以使用ParDo
将处理后的数据转换为适合输出的格式,如数据库表行或可打印字符串。
应用ParDo
变换时,需要以DoFn
对象的形式提供用户代码。DoFn
是一个定义分布式处理功能的Beam SDK类。
当创建一个DoFn的子类时,请注意该子类应遵守4.3节编写Beam转换用户代码的要求。
像所有Beam转换一样,应用ParDo
转换可以通过在输入PCollection
上调用apply
方法和把ParDo
作为参数来传递,如以下示例代码所示:
// The input PCollection of Strings.
PCollection<String> words = ...;
// The DoFn to perform on each element in the input PCollection.
static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }
// Apply a ParDo to the PCollection "words" to compute lengths for each word.
PCollection<Integer> wordLengths = words.apply(
ParDo.of(
new ComputeWordLengthFn())); // The DoFn to perform on each element, which
// we define above.
在示例中,输入PCollection
包含String
值。应用一个ParDo
转换,即指定一个函数(ComputeWordLengthFn
)来计算每个字符串的长度,并将结果输出到一个新的PCollection
中,且它的元素为Integer
类型存储每个字的长度的值。
传递给ParDo
的DoFn
对象包含应用于输入集合中的元素的处理逻辑。当使用Beam时,通常写出的最重要的代码片就是这些DoFn
,是它们定义了管道的确切数据处理任务是什么。
注意:创建DoFn
时,请注意4.3节编写Beam转换用户代码的要求,并确保您的代码遵循它们。
DoFn
一次处理输入PCollection
中的一个元素。创建DoFn
的子类时,需要提供与输入和输出元素类型匹配的类型参数。如果DoFn
处理传入String
类型的元素并生成Integer
类型的输出集合的元素(如之前的示例ComputeWordLengthFn
),则类声明将如下所示:
static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }
在DoFn
的子类中,编写一个带有@ProcessElement
注释的方法,其中提供实际的处理逻辑。不需要从输入集合中手动提取元素; Beam SDK可以处理。注释为@ProcessElement
方法应该接受的对象类型ProcessContext
。该ProcessContext
对象允许访问输入元素和发出输出元素的方法:
static class ComputeWordLengthFn extends DoFn<String, Integer> {
@ProcessElement
public void processElement(ProcessContext c) {
// Get the input element from ProcessContext.
String word = c.element();
// Use ProcessContext.output to emit the output element.
c.output(word.length());
}
}
注意:如果输入PCollection
中的元素是键/值对,则可以分别使用ProcessContext.element().getKey()
或键访问键或值ProcessContext.element().getValue()
。
给定的DoFn
实例通常被调用一次或多次来处理一些任意的元素组。但是,Beam不能保证确切的调用次数; 可以在给定的工作节点上多次调用它,以解决故障和重试。因此,您可以跨多个调用缓存信息到处理方法,但如果这样做,请确保实现不依赖于调用数量。
在处理方法中,您还需要满足一些不变性要求,以确保Beam和处理后端可以安全地序列化并缓存管道中的值。您的方法应符合以下要求:
您不应以任何方式修改ProcessContext.element()
或ProcessContext.sideInput()
(或从输入集合传入的元素)返回的元素。
使用ProcessContext.output()
或输出值后ProcessContext.sideOutput()
,您不应以任何方式修改该值。
如果您的功能相对简单,您可以ParDo
通过提供一个轻量级DoFn
的在线内容来简化您的使用,作为匿名内部类实例。
下面是前面的例子,ParDo
用ComputeLengthWordsFn
,用DoFn
指定为匿名内部类的实例:
// The input PCollection.
PCollection<String> words = ...;
// Apply a ParDo with an anonymous DoFn to the PCollection words.
// Save the result as the PCollection wordLengths.
PCollection<Integer> wordLengths = words.apply(
"ComputeWordLengths", // the transform name
ParDo.of(new DoFn<String, Integer>() { // a DoFn as an anonymous inner class instance
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().length());
}
}));
如果您ParDo
对输出元素执行输入元素的一对一映射,也就是说,对于每个输入元素,它将应用一个产生一个输出元素的函数,您可以使用更高级的变换。可以接受一个匿名的Java 8 lambda函数来进一步简化。MapElementsMapElements
以下是使用以下示例:MapElements
// The input PCollection.
PCollection<String> words = ...;
// Apply a MapElements with an anonymous lambda function to the PCollection words.
// Save the result as the PCollection wordLengths.
PCollection<Integer> wordLengths = words.apply(
MapElements.into(TypeDescriptors.integers())
.via((String word) -> word.length()));
注意:您可以使用Java 8 lambda函数与其他几个波束转换,其中包括Filter
,FlatMapElements
,和Partition
。
GroupByKey
是一个用于处理键/值对集合的波束变换。这是一个并行还原操作,类似于Map / Shuffle / Reduce-style算法的Shuffle阶段。输入GroupByKey
是表示多重映射的键/值对的集合,其中集合包含具有相同键但具有不同值的多个对。给定这样的集合,您可以使用GroupByKey
收集与每个唯一键相关联的所有值。
GroupByKey
是汇总具有共同点的数据的好方法。例如,如果您有一个存储客户订单记录的集合,则可能需要将来自相同邮政编码的所有订单分组在一起(其中键/值对的“键”是邮政编码字段,而“值“是记录的剩余部分)。
我们来看一下GroupByKey
简单例子的机制,其中我们的数据集由文本文件中的单词和出现的行号组成。我们想将所有共享相同单词(键)的行号(值)组合在一起,让我们看到文本中出现特定单词的所有位置。
我们的输入是一个PCollection
键/值对,其中每个单词都是一个键,该值是该单词出现在文件中的行号。以下是输入集合中键/值对的列表:
cat, 1
dog, 5
and, 1
jump, 3
tree, 2
cat, 5
dog, 2
and, 2
cat, 9
and, 6
...
GroupByKey
使用相同的密钥收集所有值,并输出一个新的对,其中包含唯一键和与输入集合中的该关键字相关联的所有值的集合。如果我们应用GroupByKey
到上面的输入集合,输出集合将如下所示:
cat, [1,5,9]
dog, [5,2]
and, [1,2,6]
jump, [3]
tree, [2]
...
因此,GroupByKey
表示从多重映射(多个键到单个值)到单一映射(唯一键到值集合)的转换。
CoGroupByKey
连接两个或多个PCollection
具有相同键类型的键/值,然后发出一组KV<K, CoGbkResult>
对。设计您的流水线显示使用连接的示例管道。
给出以下输入集合:
// collection 1
user1, address1
user2, address2
user3, address3
// collection 2
user1, order1
user1, order2
user2, order3
guest, order4
...
CoGroupByKey
从所有PCollection
s中收集具有相同键的值,并输出由唯一键和CoGbkResult
包含与该键相关联的所有值的对象组成的新对。如果您应用于CoGroupByKey
上面的输入集合,则输出集合将如下所示:
user1, [[address1], [order1, order2]]
user2, [[address2], [order3]]
user3, [[address3], []]
guest, [[], [order4]]
...
关键/价值对的注意事项:根据您使用的语言和SDK,Beam代表键/值对略有不同。在Beam SDK for Java中,您可以使用类型对象来表示键/值对KV<K, V>
。在Python中,您使用2元组表示键/值对。
Combine
是一种用于组合数据中元素或值集合的波束变换。Combine
具有在整个PCollection
s 上工作的变体,并且一些组合PCollection
键/值对中的每个键的值。
应用Combine
变换时,必须提供包含用于组合元素或值的逻辑的函数。组合函数应该是可交换和关联的,因为函数不一定在给定键的所有值上正确调用一次。由于输入数据(包括值集合)可以分布在多个工作者之间,因此可以多次调用组合函数以在值集合的子集上执行部分组合。Beam SDK还提供了一些预构建的组合功能,用于常数数字组合操作,如sum,min和max。
简单的组合操作(如和)通常可以实现为一个简单的功能。更复杂的组合操作可能需要您创建一个CombineFn
具有与输入/输出类型不同的累加类型的子类。
以下示例代码显示了一个简单的组合函数。
// Sum a collection of Integer values. The function SumInts implements the interface SerializableFunction.
public static class SumInts implements SerializableFunction<Iterable<Integer>, Integer> {
@Override
public Integer apply(Iterable<Integer> input) {
int sum = 0;
for (int item : input) {
sum += item;
}
return sum;
}
}
对于更复杂的组合函数,可以定义一个子类CombineFn
。您应该使用CombineFn
组合功能需要更复杂的累加器,必须执行额外的预处理或后处理,可能会更改输出类型或将密钥考虑在内。
一般组合操作由四个操作组成。创建子类时CombineFn
,必须通过覆盖相应的方法来提供四个操作:
1. 创建累加器创建一个新的“本地”累加器。在示例情况下,取平均值,本地累加器跟踪运行的值(我们的最终平均除法的分子值)和到目前为止的总和值(分母值)。它可以以分布式的方式被称为任何次数。
2. Add Input将一个输入元素添加到累加器,返回累加器值。在我们的例子中,它会更新总和并增加计数。也可以并行调用它。
3. 合并累加器将多个累加器合并到单个累加器中; 这是在最终计算之前如何组合多个累加器中的数据。在平均平均计算的情况下,表示划分的每个部分的累加器被合并在一起。它的输出可能再次被呼叫多次。
4. 提取输出执行最终计算。在计算平均值的情况下,这意味着将所有值的组合和除以求和的数量。在最终合并的累加器上调用一次。
以下示例代码显示如何定义一个CombineFn
计算平均值的平均值:
public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> {
public static class Accum {
int sum = 0;
int count = 0;
}
@Override
public Accum createAccumulator() { return new Accum(); }
@Override
public Accum addInput(Accum accum, Integer input) {
accum.sum += input;
accum.count++;
return accum;
}
@Override
public Accum mergeAccumulators(Iterable<Accum> accums) {
Accum merged = createAccumulator();
for (Accum accum : accums) {
merged.sum += accum.sum;
merged.count += accum.count;
}
return merged;
}
@Override
public Double extractOutput(Accum accum) {
return ((double) accum.sum) / accum.count;
}
}
如果您正在组合PCollection
键值对,则每键合并通常就足够了。如果您需要根据密钥更改组合策略(例如,某些用户的MIN和其他用户的MIN),则可以KeyedCombineFn
在组合策略中定义一个访问密钥。
使用全局组合将给定中的所有元素PCollection
转换成单个值,在您的流水线中表示为新的PCollection
包含一个元素。以下示例代码显示了如何应用Beam提供的sum combine函数为一个PCollection
整数产生一个总和值。
// Sum.SumIntegerFn() combines the elements in the input PCollection.
// The resulting PCollection, called sum, contains one value: the sum of all the elements in the input PCollection.
PCollection<Integer> pc = ...;
PCollection<Integer> sum = pc.apply(
Combine.globally(new Sum.SumIntegerFn()));
如果您的输入PCollection
使用默认的全局窗口,则默认行为是返回PCollection
包含一个项目。该项的值来自在应用时指定的合并函数中的累加器Combine
。例如,提供的sum组合函数返回零值(空输入的和),而min组合函数返回最大或无限值。
有Combine
,而不是返回一个空PCollection
当输入为空,指定.withoutDefaults
当你申请你的Combine
变换,如下面的代码示例:
PCollection<Integer> pc = ...;
PCollection<Integer> sum = pc.apply(
Combine.globally(new Sum.SumIntegerFn()).withoutDefaults());
如果您PCollection
使用任何非全局窗口函数,则Beam不提供默认行为。应用时,您必须指定以下选项之一Combine
:
指定.withoutDefaults
输入PCollection
集合中输入中为空的窗口同样为空。
指定.asSingletonView
,其中输出立即转换为a PCollectionView
,当为边输入时,将为每个空窗口提供默认值。通常情况下,如果将管道的结果Combine
用作后面的边输入,通常只需要使用此选项。
在创建密钥分组的集合(例如,通过使用GroupByKey
转换)之后,公共模式是将与每个密钥相关联的值的集合组合成单个合并的值。根据以前的例子GroupByKey
,一个键分组的PCollection
调用groupedWords
看起来像这样:
cat, [1,5,9]
dog, [5,2]
and, [1,2,6]
jump, [3]
tree, [2]
...
在上文中PCollection
,每个元素都有一个字符串键(例如“cat”)和一个可迭代的整数(在第一个元素中包含[1,5,9])。如果我们的管道的下一个处理步骤组合了这些值(而不是单独考虑它们),则可以组合整数的迭代,以创建与每个键配对的单个合并值。这种模式,GroupByKey
然后合并值的集合相当于Beam的Combine PerKey转换。Combine PerKey提供的组合函数必须是关联缩减函数或子类CombineFn
。
// PCollection is grouped by key and the Double values associated with each key are combined into a Double.
PCollection<KV<String, Double>> salesRecords = ...;
PCollection<KV<String, Double>> totalSalesPerPerson =
salesRecords.apply(Combine.<String, Double, Double>perKey(
new Sum.SumDoubleFn()));
// The combined value is of a different type than the original collection of values per key.
// PCollection has keys of type String and values of type Integer, and the combined value is a Double.
PCollection<KV<String, Integer>> playerAccuracy = ...;
PCollection<KV<String, Double>> avgAccuracyPerPlayer =
playerAccuracy.apply(Combine.<String, Integer, Double>perKey(
new MeanInts())));
Flatten
并且是存储相同数据类型的对象的Beam变换。合并多个对象到一个单一的逻辑,和分割一个单一成固定数量的更小的集合。Partition
PCollectionFlattenPCollectionPCollectionPartitionPCollection
以下示例显示如何应用Flatten
转换来合并多个PCollection
对象。
// Flatten takes a PCollectionList of PCollection objects of a given type.
// Returns a single PCollection that contains all of the elements in the PCollection objects in that list.
PCollection<String> pc1 = ...;
PCollection<String> pc2 = ...;
PCollection<String> pc3 = ...;
PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3);
PCollection<String> merged = collections.apply(Flatten.<String>pCollections());
缺省情况下,输出编码器PCollection
是一样的编码器,用于在第一PCollection
输入PCollectionList
。但是,输入PCollection
对象可以使用不同的编码器,只要它们都以您所选择的语言包含相同的数据类型即可。
当使用Flatten
合并PCollection
具有应用窗口策略的PCollection
对象时,要合并的所有对象必须使用兼容的窗口策略和窗口大小。例如,您合并的所有集合必须全部使用(假设)相同的5分钟固定窗口或每30秒钟启动4分钟滑动窗口。
如果您的管道尝试使用不兼容的窗口Flatten
合并PCollection
对象,则IllegalStateException
当您的管道构建时,Beam会生成错误。
PartitionPCollection
根据您提供的分区功能划分一个元素。分区功能包含确定如何将输入元素分割PCollection
成每个生成的分区的逻辑PCollection
。分区数必须在图形构建时确定。例如,您可以在运行时将分区数作为命令行选项传递(然后用于构建流水线图),但是您无法确定中间管道中的分区数(基于以后计算的数据)例如,您的流水线图是构建的)。
以下示例将a PCollection
分为百分位组。
// Provide an int value with the desired number of result partitions, and a PartitionFn that represents the partitioning function.
// In this example, we define the PartitionFn in-line.
// Returns a PCollectionList containing each of the resulting partitions as individual PCollection objects.
PCollection<Student> students = ...;
// Split students up into 10 partitions, by percentile:
PCollectionList<Student> studentsByPercentile =
students.apply(Partition.of(10, new PartitionFn<Student>() {
public int partitionFor(Student student, int numPartitions) {
return student.getPercentile() // 0..99
* numPartitions / 100;
}}));
// You can extract each partition from the PCollectionList using the get method, as follows:
PCollection<Student> fortiethPercentile = studentsByPercentile.get(4);
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。