Hive UDF初探
1. 引言
在前一篇中,解決了Hive表中復(fù)雜數(shù)據(jù)結(jié)構(gòu)平鋪化以導(dǎo)入Kylin的問題,但是平鋪之后計算廣告日志的曝光PV是翻倍的,因為一個用戶對應(yīng)于多個標(biāo)簽。所以,為了計算曝光PV,我們得另外創(chuàng)建視圖。
分析需求:
- 每個DSP上的曝光PV,標(biāo)簽覆蓋的曝光PV;
- 累計曝光PV,累計標(biāo)簽覆蓋曝光PV
相當(dāng)于cube(dsp, tag) + measure(pv),HiveQL如下:
select dsp, tag, count(*) as pv
from ad_view
where view = 'view' and day_time between '2016-04-18' and '2016-04-24'
group by dsp, tag with cube;
現(xiàn)在問題來了:如何將原始表中的tags array<struct<tag:string,label:string,src:string>> 轉(zhuǎn)換成有標(biāo)簽(taged)、無標(biāo)簽(empty)呢?顯而易見的辦法,為字段tags寫一個UDF來判斷是否有標(biāo)簽。
2. 實戰(zhàn)
基本介紹
user-defined function (UDF)包括:
- 對于字段進(jìn)行轉(zhuǎn)換操作的函數(shù),如round()、abs()、concat()等;
- 聚集函數(shù)user-defined aggregate functions (UDAFs),比如sum()、avg()等;
- 表生成函數(shù)user-defined table generating functions (UDTFs),生成多列或多行數(shù)據(jù),比如explode()、inline()等
UDTF的使用在與select語句使用時受到了限制,比如,不能與其他的列組合出現(xiàn):
hive> SELECT name, explode(subordinates) FROM employees;
FAILED: Error in semantic analysis: UDTF's are not supported outside the SELECT clause, nor nested in expressions
Hive提供LATERAL VIEW關(guān)鍵字,對UDTF的輸入進(jìn)行包裝(wrap),如此可以達(dá)到列組合的效果:
hive> SELECT name, sub
> FROM employees
> LATERAL VIEW explode(subordinates) subView AS sub;
UDF與GenericUDF
org.apache.hadoop.hive.ql.exec.UDF是字段轉(zhuǎn)換操作的基類,提供對于簡單數(shù)據(jù)類型進(jìn)行轉(zhuǎn)換操作。在實現(xiàn)轉(zhuǎn)換操作時,需要重寫evaluate()方法。較UDF抽象類,org.apache.hadoop.hive.ql.udf.generic.GenericUDF提供更為復(fù)雜的處理方法類,包括三個方法:
- initialize(ObjectInspector[] arguments),檢查輸入?yún)?shù)的類型、確定返回值的類型;
- evaluate(DeferredObject[] arguments),字段轉(zhuǎn)換操作的實現(xiàn)函數(shù),其返回值的類型與initialize方法中所指定的返回類型保持一致;
- getDisplayString(String[] children),給Hadoop任務(wù)展示debug信息的。
判斷tags array<struct<tag:string,label:string,src:string>>是否為空標(biāo)簽(EMPTY)的UDF實現(xiàn)如下:
@Description(name = "checkTag",
value = "_FUNC_(array<struct>) - from the input array of struct "+
"returns the TAGED or EMPTY(no tag).",
extended = "Example:\n"
+ " > SELECT _FUNC_(tags_array) FROM src;")
public class CheckTag extends GenericUDF {
private ListObjectInspector listOI;
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
if (arguments.length != 1) {
throw new UDFArgumentLengthException("only takes 1 arguments: List<T>");
}
ObjectInspector a = arguments[0];
if (!(a instanceof ListObjectInspector)) {
throw new UDFArgumentException("first argument must be a list / array");
}
this.listOI = (ListObjectInspector) a;
if(!(listOI.getListElementObjectInspector() instanceof StructObjectInspector)) {
throw new UDFArgumentException("first argument must be a list of struct");
}
return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
}
public Object evaluate(DeferredObject[] arguments) throws HiveException {
if(listOI == null || listOI.getListLength(arguments[0].get()) == 0) {
return "null_field";
}
StructObjectInspector structOI = (StructObjectInspector) listOI.getListElementObjectInspector();
String tag = structOI.getStructFieldData(listOI.getListElement(arguments[0].get(), 0),
structOI.getStructFieldRef("tag")).toString();
if (listOI.getListLength(arguments[0].get()) == 1 && tag.equals("EMPTY")) {
return "EMPTY";
}
return "TAGED";
}
public String getDisplayString(String[] children) {
return "check tag whether is empty";
}
}
還需添加依賴:
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>0.14.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.5.0-cdh5.3.2</version>
<scope>provided</scope>
</dependency>
編譯后打成jar包,放在HDFS上,然后add jar即可調(diào)用該UDF了:
add jar hdfs://path/to/udf-1.0-SNAPSHOT.jar;
create temporary function checktag as 'com.hive.udf.CheckTag';
create view if not exists yooshu_view
partitioned on (day_time)
as
select uid, dsp, view, click, checktag(tags) as tag, day_time
from ad_base;

浙公網(wǎng)安備 33010602011771號