这篇文章主要介绍“Spark数据集的过滤方法”,在日常操作中,相信很多人在Spark数据集的过滤方法问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Spark数据集的过滤方法”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
在实际工作中,根据某个字段,对一个Spark数据集进行过滤,是一个很常见的场景,举个例子:
一个存储公司员工信息的数据集A,有以下三个字段:
id: Integer name: String age: Integer
现在要过滤出某些员工的id,这些id在B集合(B可能是哈希表,也可能是Spark数据集)中,过滤逻辑为:
C = A.filter(A.id in B)
有四种方法可以实现,分别为:
Filter
Map
MapPartition
Inner Join
下面是详细介绍。
Spark的Filter变换,可以根据条件表达式、返回布尔值的过滤函数、条件字符串,对数据集进行过滤,使用方法如下:
// 1. 条件表达式A1 = A.filter(Column condition)// 2. 自定义过滤函数A1 = A.filter(FilterFunction<T> func)// 3. 条件字符串A1 = A.filter(String condition)
Filter 变换比较简单,逐条处理记录不论数据集大小,效率都很高,但需要能够将用来过滤的数据集B广播到所有的executor上。
Map变换,对数据集中每条记录调用一个函数,返回值可以是null,也可以是相同类型或不同类型的新记录,使用方法如下:
// encoder参数用来指定输出类型A2 = A.map(MapFunction<T,U> func, Encoder<U> encoder)
通过Map变换实现过滤的话,只需要在Map变换中,将符合条件的记录原样返回,不符合条件的记录返回null即可。
可以看到,Map变换的语义和Filter变换的语义相似,都是逐条处理记录,但Map需要提供一个额外的Encoder,故没有Filter简单和优雅,且因为输出要过滤null值,所以效率不如Filter。
MapPartitions变换,与Map变换类似,但映射函数不是在每条记录上调用,而是在分区级别调用,使用方法如下:
// func的输入和输出都是Iterator类型A3 = A.map(MapPartitionsFunction<T,U> func, Encoder<U> encoder)
MapPartitions在分区级别进行操作,而不是记录级别,因此比Filter和Map效率更高。缺点的话,首先和Map一样,需要提供一个额外的Encoder,此外,当分区过大,超过executor所能提供的内存时,任务会失败,因此可靠性不如Map和Filter。
以员工id相等为Inner Join的条件,然后只要A集合中的字段,同样可以实现过滤,使用方法:
// join表达式可能为 A("id") === B("id")A4 = A.join(Dataset<?> B, Column joinExprs)
Inner Join和Filter一样,效率和可靠性都有保证,且对B集合的类型和大小都没有偏好。
到此,关于“Spark数据集的过滤方法”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。