温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

udf、udaf、udtf怎么用

发布时间:2021-12-16 15:05:32 来源:亿速云 阅读:232 作者:小新 栏目:云计算

小编给大家分享一下udf、udaf、udtf怎么用,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

一、UDF

1、背景:Hive是基于Hadoop中的MapReduce,提供HQL查询的数据仓库。Hive是一个很开放的系统,很多内容都支持用户定制,包括:

a)文件格式:Text File,Sequence File

b)内存中的数据格式: Java Integer/String, Hadoop IntWritable/Text

c)用户提供的 map/reduce 脚本:不管什么语言,利用 stdin/stdout 传输数据

d)用户自定义函数: Substr, Trim, 1 – 1

e)用户自定义聚合函数: Sum, Average…… n – 1

2、定义:UDF(User-Defined-Function),用户自定义函数对数据进行处理。

二、用法

1、UDF函数可以直接应用于select语句,对查询结构做格式化处理后,再输出内容。

2、编写UDF函数的时候需要注意一下几点:

a)自定义UDF需要继承org.apache.hadoop.hive.ql.UDF。

b)需要实现evaluate函。

c)evaluate函数支持重载。

3、以下是两个数求和函数的UDF。evaluate函数代表两个整型数据相加,两个浮点型数据相加,可变长数据相加。

package hive.connect;

import org.apache.hadoop.hive.ql.exec.UDF;

public final class Add extends UDF {
   public Integer evaluate(Integer a, Integer b) {
     if (null == a || null == b) {
        return null;
     }
     return a + b;
   }

   public Double evaluate(Double a, Double b) {
     if (a == null || b == null)
        return null;
     return a + b;
   }

   public Integer evaluate(Integer... a) {
     int total = 0;
     for (int i = 0; i < a.length; i++)
        if (a[i] != null)
          total += a[i];

     return total;
   }
}
4、步骤

a)把程序打包放到目标机器上去;

b)进入hive客户端,添加jar包:hive>add jar /run/jar/udf_test.jar;

c)创建临时函数:hive>CREATE TEMPORARY FUNCTION add_example AS 'hive.udf.Add';

d)查询HQL语句:

SELECT add_example(8, 9) FROM scores;

SELECT add_example(scores.math, scores.art) FROM scores;

SELECT add_example(6, 7, 8, 6.8) FROM scores;

e)销毁临时函数:hive> DROP TEMPORARY FUNCTION add_example;

5、细节在使用UDF的时候,会自动进行类型转换,例如:

SELECT add_example(8,9.1) FROM scores;

结果是17.1,UDF将类型为Int的参数转化成double。类型的饮食转换是通过UDFResolver来进行控制的。

三、UDAF

1、Hive查询数据时,有些聚类函数在HQL没有自带,需要用户自定义实现。

2、用户自定义聚合函数: Sum, Average…… n – 1

UDAF(User- Defined Aggregation Funcation)

 

四、用法

1、一下两个包是必须的import org.apache.hadoop.hive.ql.exec.UDAF和 org.apache.hadoop.hive.ql.exec.UDAFEvaluator。

2、函数类需要继承UDAF类,内部类Evaluator实UDAFEvaluator接口。

3、Evaluator需要实现 init、iterate、terminatePartial、merge、terminate这几个函数。

a)init函数实现接口UDAFEvaluator的init函数。

b)iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean。

c)terminatePartial无参数,其为iterate函数轮转结束后,返回轮转数据,terminatePartial类似于hadoop的Combiner。

d)merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean。

e)terminate返回最终的聚集函数结果。

4、以下为一个求平均数的UDAF:

package hive.udaf;

import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

public class Avg extends UDAF {
   public static class AvgState {
     private long mCount;
     private double mSum;
   }

   public static class AvgEvaluator implements UDAFEvaluator {
     AvgState state;

     public AvgEvaluator() {
        super();
        state = new AvgState();
        init();
     }

     /**
      * init函数类似于构造函数,用于UDAF的初始化
      */
     public void init() {
        state.mSum = 0;
        state.mCount = 0;
     }

     /**
      * iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean
      * 
      * @param o
      * @return
      */
     public boolean iterate(Double o) {
        if (o != null) {
          state.mSum += o;
          state.mCount++;
        }
        return true;
     }

     /**
      * terminatePartial无参数,其为iterate函数轮转结束后,返回轮转数据,
      * terminatePartial类似于hadoop的Combiner
      * 
      * @return
      */
     public AvgState terminatePartial() {// combiner
        return state.mCount == 0 ? null : state;
     }

     /**
      * merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean
      * 
      * @param o
      * @return
      */
     public boolean merge(AvgState o) {
        if (o != null) {
          state.mCount += o.mCount;
          state.mSum += o.mSum;
        }
        return true;
     }

     /**
      * terminate返回最终的聚集函数结果
      * 
      * @return
      */
     public Double terminate() {
        return state.mCount == 0 ? null : Double.valueOf(state.mSum
             / state.mCount);
     }
   }
}
5、执行求平均数函数的步骤

a)将java文件编译成Avg_test.jar。

b)进入hive客户端添加jar包:

hive>add jar /run/jar/Avg_test.jar。

c)创建临时函数:

hive>create temporary function avg_test 'hive.udaf.Avg';

d)查询语句:

hive>select avg_test(scores.math) from scores;

e)销毁临时函数:

hive>drop temporary function avg_test;

五、总结

1、重载evaluate函数。

2、UDF函数中参数类型可以为Writable,也可为java中的基本数据对象。

3、UDF支持变长的参数。

4、Hive支持隐式类型转换。

5、客户端退出时,创建的临时函数自动销毁。

6、evaluate函数必须要返回类型值,空的话返回null,不能为void类型。

7、UDF是基于单条记录的列进行的计算操作,而UDFA则是用户自定义的聚类函数,是基于表的所有记录进行的计算操作。

8、UDF和UDAF都可以重载。

9、查看函数

SHOW FUNCTIONS; 
DESCRIBE FUNCTION <function_name>;

 

UDTF步骤:

1.必须继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
2.实现initialize, process, close三个方法
3.UDTF首先会
     a.调用initialize方法,此方法返回UDTF的返回行的信息(返回个数,类型)
     b.初始化完成后,会调用process方法,对传入的参数进行处理,可以通过forword()方法把结果返回
     c.最后close()方法调用,对需要清理的方法进行清理


Java代码 

public class GenericUDTFExplode extends GenericUDTF {  

  

  private ListObjectInspector listOI = null;  

  

  @Override  

  public void close() throws HiveException {  

  }  

  

  @Override  

  public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {  

    if (args.length != 1) {  

      throw new UDFArgumentException("explode() takes only one argument");  

    }  

  

    if (args[0].getCategory() != ObjectInspector.Category.LIST) {  

      throw new UDFArgumentException("explode() takes an array as a parameter");  

    }  

    listOI = (ListObjectInspector) args[0];  

  

    ArrayList<String> fieldNames = new ArrayList<String>();  

    ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();  

    fieldNames.add("col");  

    fieldOIs.add(listOI.getListElementObjectInspector());  

    return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,  

        fieldOIs);  

  }  

  

  private final Object[] forwardObj = new Object[1];  

  

  @Override  

  public void process(Object[] o) throws HiveException {  

    List<?> list = listOI.getList(o[0]);  

    if(list == null) {  

      return;  

    }  

    for (Object r : list) {  

      forwardObj[0] = r;  

      forward(forwardObj);  

    }  

  }  

  

  @Override  

  public String toString() {  

    return "explode";  

  }  

10、wiki链接:http://wiki.apache.org/hadoop/Hive/LanguageManual/UDF

Deploying Jars for User Defined Functions and User Defined SerDes

In order to start using your UDF, you first need to add the code to the classpath:

hive> add jar my_jar.jar;

Added my_jar.jar to class path

By default, it will look in the current directory. You can also specify a full path:

hive> add jar /tmp/my_jar.jar;

Added /tmp/my_jar.jar to class path

Your jar will then be on the classpath for all jobs initiated from that session. To see which jars have been added to the classpath you can use:

hive> list jars;

my_jar.jar

See Hive CLI for full syntax and more examples.

As of Hive 0.13, UDFs also have the option of being able to specify required jars in the CREATE FUNCTION statement:

CREATE FUNCTION myfunc AS 'myclass' USING JAR 'hdfs:///path/to/jar';

This will add the jar to the classpath as if ADD JAR had been called on that jar. 

以上是“udf、udaf、udtf怎么用”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI