前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >hive学习笔记之十一:UDTF

hive学习笔记之十一:UDTF

原创
作者头像
程序员欣宸
修改2021-07-15 10:52:43
8410
修改2021-07-15 10:52:43
举报
文章被收录于专栏:实战docker实战docker

欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos

《hive学习笔记》系列导航

  1. 基本数据类型
  2. 复杂数据类型
  3. 内部表和外部表
  4. 分区表
  5. 分桶
  6. HiveQL基础
  7. 内置函数
  8. Sqoop
  9. 基础UDF
  10. 用户自定义聚合函数(UDAF)
  11. UDTF

本篇概览

  • 本文是《hive学习笔记》系列的第十一篇,截至目前,一进一出的UDF、多进一出的UDAF咱们都学习过了,最后还有一进多出的UDTF留在本篇了,这也是本篇的主要内容;
  • 一进多出的UDTF,名为用户自定义表生成函数(User-Defined Table-Generating Functions, UDTF);
  • 前面的文章中,咱们曾经体验过explode就是hive内置的UDTF:
代码语言:txt
复制
hive> select explode(address) from t3;
OK
province    guangdong
city    shenzhen
province    jiangsu
city    nanjing
Time taken: 0.081 seconds, Fetched: 4 row(s)
  • 本篇的UDTF一共有两个实例:把一列拆成多列、把一列拆成多行(每行多列);
  • 接下来开始实战;

源码下载

  • 如果您不想编码,可以在GitHub下载所有源码,地址和链接信息如下表所示:

名称

链接

备注

项目主页

该项目在GitHub上的主页

git仓库地址(https)

该项目源码的仓库地址,https协议

git仓库地址(ssh)

git@github.com:zq2599/blog_demos.git

该项目源码的仓库地址,ssh协议

  • 这个git项目中有多个文件夹,本章的应用在hiveudf文件夹下,如下图红框所示:
    在这里插入图片描述
    在这里插入图片描述

准备工作

为了验证UDTF的功能,咱们要先把表和数据都准备好:

  • 新建名为t16的表:
代码语言:txt
复制
create table t16(
person_name  string,
string_field string
)
row format delimited 
fields terminated by '|'
stored as textfile;
  • 本地新建文本文件016.txt,内容如下:
代码语言:txt
复制
tom|1:province:guangdong
jerry|2:city:shenzhen
john|3
  • 导入数据:
代码语言:txt
复制
load data 
local inpath '/home/hadoop/temp/202010/25/016.txt' 
overwrite into table t16;
  • 数据准备完毕,开始编码;

UDTF开发的关键点

  • 需要继承GenericUDTF类;
  • 重写initialize方法,该方法的入参只有一个,类型是StructObjectInspector,从这里可以取得UDTF作用了几个字段,以及字段类型;
  • initialize的返回值是StructObjectInspector类型,UDTF生成的每个列的名称和类型都设置到返回值中;
  • 重写process方法,该方法中是一进多出的逻辑代码,把每个列的数据准备好放在数组中,执行一次forward方法,就是一行记录;
  • close方法不是必须的,如果业务逻辑执行完毕,可以将释放资源的代码放在这里执行;
  • 接下来,就按照上述关键点开发UDTF;

一列拆成多列

  • 接下来要开发的UDTF,名为udf_wordsplitsinglerow,作用是将入参拆分成多个列;
  • 下图红框中是t16表的一条原始记录的string_field字段,会被udf_wordsplitsinglerow处理:
在这里插入图片描述
在这里插入图片描述
  • 上面红框中的字段被UDTF处理处理后,一列变成了三列,每一列的名称如下图黄框所示,每一列的值如红框所示:
在这里插入图片描述
在这里插入图片描述
  • 以上就是咱们马上就要开发的功能;
  • 打开前文创建的hiveudf工程,新建WordSplitSingleRow.java:
代码语言:txt
复制
package com.bolingcavalry.hiveudf.udtf;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.util.ArrayList;
import java.util.List;

/**
 * @Description: 把指定字段拆成多列
 * @author: willzhao E-mail: zq2599@gmail.com
 * @date: 2020/11/5 14:43
 */
public class WordSplitSingleRow extends GenericUDTF {

    private PrimitiveObjectInspector stringOI = null;

    private final static String[] EMPTY_ARRAY = {"NULL", "NULL", "NULL"};

    /**
     * 一列拆成多列的逻辑在此
     * @param args
     * @throws HiveException
     */
    @Override
    public void process(Object[] args) throws HiveException {

        String input = stringOI.getPrimitiveJavaObject(args[0]).toString();

        // 无效字符串
        if(StringUtils.isBlank(input)) {
            forward(EMPTY_ARRAY);
        } else {

            // 分割字符串
            String[] array = input.split(":");

            // 如果字符串数组不合法,就返回原始字符串和错误提示
            if(null==array || array.length<3) {
                String[] errRlt = new String[3];
                errRlt[0] = input;
                errRlt[1] = "can not split to valid array";
                errRlt[2] = "-";

                forward(errRlt);
            } else {
                forward(array);
            }
        }
    }

    /**
     * 释放资源在此执行,本例没有资源需要释放
     * @throws HiveException
     */
    @Override
    public void close() throws HiveException {

    }

    @Override
    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {

        List<? extends StructField> inputFields = argOIs.getAllStructFieldRefs();

        // 当前UDTF只处理一个参数,在此判断传入的是不是一个参数
        if (1 != inputFields.size()) {
            throw new UDFArgumentLengthException("ExplodeMap takes only one argument");
        }

        // 此UDTF只处理字符串类型
        if(!Category.PRIMITIVE.equals(inputFields.get(0).getFieldObjectInspector().getCategory())) {
            throw new UDFArgumentException("ExplodeMap takes string as a parameter");
        }

        stringOI = (PrimitiveObjectInspector)inputFields.get(0).getFieldObjectInspector();

        //列名集合
        ArrayList<String> fieldNames = new ArrayList<String>();

        //列对应的value值
        ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();

        // 第一列的列名
        fieldNames.add("id");
        // 第一列的inspector类型为string型
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        // 第二列的列名
        fieldNames.add("key");
        // 第二列的inspector类型为string型
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        // 第三列的列名
        fieldNames.add("value");
        // 第三列的inspector类型为string型
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }
}
  • 上述代码中的重点是process方法,取得入参后用冒号分割字符串,得到数组,再调用forward方法,就生成了一行记录,该记录有三列;

验证UDTF

接下来将WordSplitSingleRow.java部署成临时函数并验证;

  • 编码完成后,在pom.xml所在目录执行命令mvn clean package -U
  • 在target目录得到文件hiveudf-1.0-SNAPSHOT.jar
  • 将jar下载到hive服务器,我这里放在此目录:/home/hadoop/udf/
  • 在hive会话模式执行以下命令添加本地jar:
代码语言:txt
复制
add jar /home/hadoop/udf/hiveudf-1.0-SNAPSHOT.jar;
  • 部署临时函数:
代码语言:txt
复制
create temporary function udf_wordsplitsinglerow as 'com.bolingcavalry.hiveudf.udtf.WordSplitSingleRow';
  • 执行以下SQL验证:
代码语言:txt
复制
select udf_wordsplitsinglerow(string_field) from t16;
  • 结果如下,可见每一行记录的string_field字段都被分割成了id、key、value三个字段:
代码语言:txt
复制
hive> select udf_wordsplitsinglerow(string_field) from t16;
OK
id  key value
1   province    guangdong
2   city    shenzhen
3   can not split to valid array    -
Time taken: 0.066 seconds, Fetched: 3 row(s)

关键点要注意

  • 值得注意的是,UDTF不能和其他字段同时出现在select语句中,例如以下的SQL会执行失败:
代码语言:txt
复制
select person_name,udf_wordsplitsinglerow(string_field) from t16;
  • 错误信息如下:
代码语言:txt
复制
hive> select person_name,udf_wordsplitsinglerow(string_field) from t16;
FAILED: SemanticException [Error 10081]: UDTF's are not supported outside the SELECT clause, nor nested in expressions
  • 如果希望得到UDTF和其他字段的结果,可以使用LATERAL VIEW语法,完整SQL如下:
代码语言:txt
复制
select t.person_name, udtf_id, udtf_key, udtf_value
from (
    select person_name, string_field 
    from  t16
) t LATERAL VIEW udf_wordsplitsinglerow(t.string_field) v as  udtf_id, udtf_key, udtf_value;
  • 查询结果如下,可见指定字段和UDTF都能显示:
代码语言:txt
复制
hive> select t.person_name, udtf_id, udtf_key, udtf_value
    > from (
    >     select person_name, string_field 
    >     from  t16
    > ) t LATERAL VIEW udf_wordsplitsinglerow(t.string_field) v as  udtf_id, udtf_key, udtf_value;
OK
t.person_name   udtf_id udtf_key    udtf_value
tom 1   province    guangdong
jerry   2   city    shenzhen
john    3   can not split to valid array    -
Time taken: 0.122 seconds, Fetched: 3 row(s)

一列拆成多行(每行多列)

  • 前面咱们试过了将string_field字段拆分成idkeyvalue三个字段,不过拆分后总行数还是不变,接下来的UDTF,是把string_field拆分成多条记录,然后每条记录都有三个字段;
  • 需要导入新的数据到t16表,新建文本文件016_multi.txt,内容如下:
代码语言:txt
复制
tom|1:province:guangdong,4:city:yangjiang
jerry|2:city:shenzhen
john|3
  • 在hive会话窗口执行以下命令,会用016_multi.txt的内容覆盖t16表已有内容:
代码语言:txt
复制
load data 
local inpath '/home/hadoop/temp/202010/25/016_multi.txt' 
overwrite into table t16;
  • 此时的数据如下图所示,红框中是一条记录的string_field字段值,咱们接下来要开发的UDTF,会先用逗号分隔,得到的就是1:province:guangdong4:city:yangjiang这两个字符串,接下来对每个字符串用冒号分隔,就会得到两条idkeyvalue这样的记录,也就是多行多列:
在这里插入图片描述
在这里插入图片描述
  • 预期中的UDTF结果如下图所示,红框和黄框这两条记录都来自一条记录的string_field字段值:
在这里插入图片描述
在这里插入图片描述
  • 接下来开始编码,新建WordSplitMultiRow.java,代码如下,可见和WordSplitSingleRow的差异仅在process方法,WordSplitMultiRow的process中执行了多次forward,因此有了多条记录:
代码语言:txt
复制
package com.bolingcavalry.hiveudf.udtf;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import java.util.ArrayList;
import java.util.List;

/**
 * @Description: 把指定字段拆成多行,每行有多列
 * @author: willzhao E-mail: zq2599@gmail.com
 * @date: 2020/11/5 14:43
 */
public class WordSplitMultiRow extends GenericUDTF {

    private PrimitiveObjectInspector stringOI = null;


    private final static String[] EMPTY_ARRAY = {"NULL", "NULL", "NULL"};

    /**
     * 一列拆成多列的逻辑在此
     * @param args
     * @throws HiveException
     */
    @Override
    public void process(Object[] args) throws HiveException {
        String input = stringOI.getPrimitiveJavaObject(args[0]).toString();

        // 无效字符串
        if(StringUtils.isBlank(input)) {
            forward(EMPTY_ARRAY);
        } else {

            // 用逗号分隔
            String[] rowArray = input.split(",");

            // 处理异常
            if(null==rowArray || rowArray.length<1) {
                String[] errRlt = new String[3];
                errRlt[0] = input;
                errRlt[1] = "can not split to valid row array";
                errRlt[2] = "-";

                forward(errRlt);
            } else {
                // rowArray的每个元素,都是"id:key:value"这样的字符串
                for(String singleRow : rowArray) {

                    // 要确保字符串有效
                    if(StringUtils.isBlank(singleRow)) {
                        forward(EMPTY_ARRAY);
                    } else {
                        // 分割字符串
                        String[] array = singleRow.split(":");

                        // 如果字符串数组不合法,就返回原始字符串和错误提示
                        if(null==array || array.length<3) {
                            String[] errRlt = new String[3];
                            errRlt[0] = input;
                            errRlt[1] = "can not split to valid array";
                            errRlt[2] = "-";

                            forward(errRlt);
                        } else {
                            forward(array);
                        }
                    }
                }

            }
        }
    }

    /**
     * 释放资源在此执行,本例没有资源需要释放
     * @throws HiveException
     */
    @Override
    public void close() throws HiveException {

    }

    @Override
    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {

        List<? extends StructField> inputFields = argOIs.getAllStructFieldRefs();

        // 当前UDTF只处理一个参数,在此判断传入的是不是一个参数
        if (1 != inputFields.size()) {
            throw new UDFArgumentLengthException("ExplodeMap takes only one argument");
        }

        // 此UDTF只处理字符串类型
        if(!Category.PRIMITIVE.equals(inputFields.get(0).getFieldObjectInspector().getCategory())) {
            throw new UDFArgumentException("ExplodeMap takes string as a parameter");
        }

        stringOI = (PrimitiveObjectInspector)inputFields.get(0).getFieldObjectInspector();

        //列名集合
        ArrayList<String> fieldNames = new ArrayList<String>();

        //列对应的value值
        ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();

        // 第一列的列名
        fieldNames.add("id");
        // 第一列的inspector类型为string型
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        // 第二列的列名
        fieldNames.add("key");
        // 第二列的inspector类型为string型
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        // 第三列的列名
        fieldNames.add("value");
        // 第三列的inspector类型为string型
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }
}

验证UDTF

接下来将WordSplitMultiRow.java部署成临时函数并验证;

  • 编码完成后,在pom.xml所在目录执行命令mvn clean package -U
  • 在target目录得到文件hiveudf-1.0-SNAPSHOT.jar
  • 将jar下载到hive服务器,我这里放在此目录:/home/hadoop/udf/
  • 如果还在同一个hive会话模式,需要先清理掉之前的jar和函数:
代码语言:txt
复制
drop temporary function if exists udf_wordsplitsinglerow;
delete jar /home/hadoop/udf/hiveudf-1.0-SNAPSHOT.jar;
  • 在hive会话模式执行以下命令添加本地jar:
代码语言:txt
复制
add jar /home/hadoop/udf/hiveudf-1.0-SNAPSHOT.jar;
  • 部署临时函数:
代码语言:txt
复制
create temporary function udf_wordsplitmultirow as 'com.bolingcavalry.hiveudf.udtf.WordSplitMultiRow';
  • 执行以下SQL验证:
代码语言:txt
复制
select udf_wordsplitmultirow(string_field) from t16;
  • 结果如下,可见每一行记录的string_field字段都被分割成了id、key、value三个字段:
代码语言:txt
复制
hive> select udf_wordsplitmultirow(string_field) from t16;
OK
id  key value
1   province    guangdong
4   city    yangjiang
2   city    shenzhen
3   can not split to valid array    -
Time taken: 0.041 seconds, Fetched: 4 row(s)
  • LATERAL VIEW语法尝试将其他字段也查出来,SQL如下:
代码语言:txt
复制
select t.person_name, udtf_id, udtf_key, udtf_value
from (
    select person_name, string_field 
    from  t16
) t LATERAL VIEW udf_wordsplitmultirow(t.string_field) v as  udtf_id, udtf_key, udtf_value;
  • 结果如下,符合预期:
代码语言:txt
复制
hive> select t.person_name, udtf_id, udtf_key, udtf_value
    > from (
    >     select person_name, string_field 
    >     from  t16
    > ) t LATERAL VIEW udf_wordsplitmultirow(t.string_field) v as  udtf_id, udtf_key, udtf_value;
OK
t.person_name   udtf_id udtf_key    udtf_value
tom 1   province    guangdong
tom 4   city    yangjiang
jerry   2   city    shenzhen
john    3   can not split to valid array    -
Time taken: 0.056 seconds, Fetched: 4 row(s)
  • 至此,HIVE的三种用户自定义函数咱们都学习和实践完成了,希望这些内容能给您的实践带来一些参考;

关于容器和镜像的环境

如果您不想自己搭建kubernetes环境,推荐使用腾讯云容器服务TKE:无需自建,即可在腾讯云上使用稳定, 安全,高效,灵活扩展的 Kubernetes 容器平台;

如果您希望自己的镜像可以通过外网上传和下载,推荐腾讯云容器镜像服务TCR:像数据加密存储,大镜像多节点快速分发,跨地域镜像同步

你不孤单,欣宸原创一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 数据库+中间件系列
  6. DevOps系列

欢迎关注公众号:程序员欣宸

微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界...

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 欢迎访问我的GitHub
  • 《hive学习笔记》系列导航
  • 本篇概览
  • 源码下载
  • 准备工作
  • UDTF开发的关键点
  • 一列拆成多列
  • 验证UDTF
  • 关键点要注意
  • 一列拆成多行(每行多列)
  • 验证UDTF
  • 关于容器和镜像的环境
  • 你不孤单,欣宸原创一路相伴
  • 欢迎关注公众号:程序员欣宸
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com