温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

mapPartitions的简单介绍及使用方法

发布时间:2021-07-28 09:16:55 来源:亿速云 阅读:734 作者:chen 栏目:大数据

本篇内容介绍了“mapPartitions的简单介绍及使用方法”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!


1. mappartition简介

首先,说到mapPartitions大家肯定想到的是map和MapPartitions的对比。大家都知道mapPartition算子是使用一个函数针对分区计算的,函数参数是一个迭代器。而map只针对每条数据调用的,所以存在访问外部数据库等情况时mapParititons更加高效。  
mapPartitions函数:
  /**   * Return a new RDD by applying a function to each partition of this RDD.   *   * `preservesPartitioning` indicates whether the input function preserves the partitioner, which   * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.   */  def mapPartitions[U: ClassTag](      f: Iterator[T] => Iterator[U],      preservesPartitioning: Boolean = false): RDD[U] = withScope {    val cleanedF = sc.clean(f)    new MapPartitionsRDD(      this,      (_: TaskContext, _: Int, iter: Iterator[T]) => cleanedF(iter),      preservesPartitioning)  }
有代码可知mapPartitions的函数参数是传入一个迭代器,返回值是另一个迭代器。
map函数:  
  /**   * Return a new RDD by applying a function to all elements of this RDD.   */  def map[U: ClassTag](f: T => U): RDD[U] = withScope {    val cleanF = sc.clean(f)    new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))  }
map函数就是将rdd的元素由T类型转化为U类型。
综上可知,map和foreach这类的是针对一个元素调用一次我们的函数,也即是我们的函数参数是单个元素,假如函数内部存在数据库链接、文件等的创建及关闭,那么会导致处理每个元素时创建一次链接或者句柄,导致性能底下,很多初学者犯过这种毛病。
而foreachpartition/mapPartitions是针对每个分区调用一次我们的函数,也即是我们函数传入的参数是整个分区数据的迭代器,这样避免了创建过多的临时链接等,提升了性能。
下面的例子都是1-20这20个数字,经过map完成a*3的转换:
val a = sc.parallelize(1 to 20, 2)
def mapTerFunc(a : Int) : Int = {a*3}
val mapResult = a.map(mapTerFunc)
println(mapResult.collect().mkString(","))
结果
  
    
  
  
  
    
      
    
    3,6,9,12,15,18,21,24,27,30,33,36,39,42,45,48,51,54,57,60
           

           

3. mappartitions低效用法


大家通常的做法都是申请一个迭代器buffer,将处理后的数据加入迭代器buffer,然后返回迭代器。如下面的demo。
val a = sc.parallelize(1 to 20, 2)  def terFunc(iter: Iterator[Int]) : Iterator[Int] = {     var res = List[Int]()      while (iter.hasNext)   {           val cur = iter.next;     res.::= (cur*3) ;   }    res.iterator}
val result = a.mapPartitions(terFunc)println(result.collect().mkString(","))
结果乱序了,因为我的list是无序的,可以使用LinkList:
30,27,24,21,18,15,12,9,6,3,60,57,54,51,48,45,42,39,36,33

4. mappartitions高效用法

注意,3中的例子,会在mappartition执行期间,在内存中定义一个数组并且将缓存所有的数据。假如数据集比较大,内存不足,会导致内存溢出,任务失败。对于这样的案例,Spark的RDD不支持像mapreduce那些有上下文的写方法。其实,浪尖有个方法是无需缓存数据的,那就是自定义一个迭代器类。如下例:  
  
    
  
  
  
    
      
    
    
class CustomIterator(iter: Iterator[Int]) extends Iterator[Int] {                       def hasNext : Boolean = {                          iter.hasNext                     }                                               def next : Int= {                           val cur = iter.next                       cur*3                     }                   }                    
                  val result = a.mapPartitions(v => new CustomIterator(v))                   println(result.collect().mkString(","))              
           
结果:
   
     
   
   
   3,6,9,12,15,18,21,24,27,30,33,36,39,42,45,48,51,54,57,60

“mapPartitions的简单介绍及使用方法”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI