温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

spark union 特别注意

发布时间:2020-03-07 01:22:28 来源:网络 阅读:411 作者:大海之中 栏目:大数据

今天遇到一个很诡异的问题。

表A

userid housecode res ctime
u1 code1 1 1301

表B

userid housecode res ctime
u2 code2 0 1302

表C

userid name type time
u1 大海 0 1303

然后对表A进行处理操作

表A.createOrReplaceTempView("t1");
JavaRDD<HistoryModelExt> rdd=removeDuplicateData(t1);
t1= s.createDataFrame(rdd, HistoryModelExt.class);

然后查看t1, t1.show()

u1 code1 1 1301
.. .. .. ..

数据还在,然后 B union A 然后 join C(通过userid), 理论上应该是有结果的,感觉就像1+1=2 这么肯定,但是还真没有数据,非常诧异。

刚开始以为是自己程序哪里有问题,苦苦寻找,发现一切正常, 最后回到 union这个方法上。

为了看清楚前因后果, 我把B union A的数据打印了出来,发现了一个奇怪的事情

userid housecode res ctime
u2 code2 0 1302
1301 code1 1 u1

当时一下子就明白为什么join 没有数据了, A的schema已经与B不一致了。
原来 union函数并不是按照列名合并,而是按照位置合并。
但是在JavaRDD<HistoryModelExt> rdd=removeDuplicateData(t1); 这步之前还是一致的,为什么转成java对象后,schema就变了呢

查看源代码

  /**
   * Applies a schema to an RDD of Java Beans.
   *
   * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
   * SELECT * queries will return the columns in an undefined order.
   *
   * @since 2.0.0
   */

 def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame = {
    val attributeSeq: Seq[AttributeReference] = getSchema(beanClass)
    val className = beanClass.getName
    val rowRdd = rdd.mapPartitions { iter =>
    // BeanInfo is not serializable so we must rediscover it remotely for each partition.
      SQLContext.beansToRows(iter, Utils.classForName(className), attributeSeq)
    }
    Dataset.ofRows(self, LogicalRDD(attributeSeq, rowRdd.setName(rdd.name))(self))
  }

看注释,fields的顺序是不保证的, 原来如此。

这样你在union前乖乖的执行

t1.select("userId","houseCode","res","ctime"); 

这样顺序就又恢复了,大数据排查问题特别麻烦,感觉是一个很大的坑,希望能帮到后来人。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI