Flink Java UDF
发布日期:2025-01-04 12:00 点击次数:165
实时计算Flink版支持在Flink SQL作业中使用Java自定义函数,本文介绍Flink Java自定义函数的分类、参数传递及调用注意事项。 自定义函数注册全局自定义函数注册方法,请参见全局自定义函数。作业级自定义函数注册方法,请参见作业级自定义函数。自定义函数参数传递您可以在Flink开发控制台配置自定义函数中的参数并在UDF代码中使用。这样,后续可以直接在控制台上修改参数值,实现快速修改UDF参数值的目的。当key = k1,value = {hi,hello},则定义为'k1:{hi,hello}'。当key = k2,value = str:ing,str:ing,则定义为'k2:"str:ing,str:ing"'当key = k3,value = str"ing,str:ing,则定义为'k3:"str""ing,str:ing"'步骤2按照YAML文件的格式,形成最终的pipeline.global-job-parameters。pipeline.global-job-parameters: |
'k1:{hi,hello}',
'k2:"str:ing,str:ing"',
'k3:"str""ing,str:ing"'在自定义函数代码中,通过FunctionContext#getJobParameter获取map的各项内容。命名参数在SQL中调用函数时必须按顺序指定所有参数字段。当参数较多时,容易出现传参个数、顺序错误,而且不能省略非必填参数。通过使用命名参数,可以按需指定所需的参数,减少出错概率,使用起来也更加方便。我们通过一个自定义标量函数(ScalarFunction)的例子来介绍下命名参数的使用。// 实现一个自定义标量函数,后两个入参为可选参数(isOptional = true)
public class MyFuncWithNamedArgs extends ScalarFunction {
private static final long serialVersionUID = 1L;
public String eval(@ArgumentHint(name = "f1", isOptional = false, type = @DataTypeHint("STRING")) String f1,
@ArgumentHint(name = "f2", isOptional = true, type = @DataTypeHint("INT")) Integer i2,
@ArgumentHint(name = "f3", isOptional = true, type = @DataTypeHint("LONG")) Long l3) {
if (i2 != null) {
return "i2#" + i2;
}
if (l3 != null) {
return "l3#" + l3;
}
return "default#" + f1;
}
}在SQL中使用该自定义函数时,您可以只指定第一个必选参数,或选择性指定可选参数,代码示例如下。CREATE TEMPORARY FUNCTION MyNamedUdf AS 'com.aliyun.example.MyFuncWithNamedArgs';
CREATE temporary TABLE s1 (
a INT,
b BIGINT,
c VARCHAR,
d VARCHAR,
PRIMARY KEY(a) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'rows-per-second'='1'
);
CREATE temporary TABLE sink (
a INT,
b VARCHAR,
c VARCHAR,
d VARCHAR
) WITH (
'connector' = 'print'
);
INSERT INTO sink
SELECT a,
-- 仅指定第一个必选参数
MyNamedUdf(f1 => c) arg1_res,
-- 指定第一个必选参数及第二个可选参数
MyNamedUdf(f1 => c, f2 => a) arg2_res,
-- 指定第一个必选参数及第三个可选参数
MyNamedUdf(f1 => c, f3 => d) arg3_res
FROM s1;相关文档使用UDAF实现数据排序和聚合。Java自定义函数的开发和使用demo,请参见自定义聚合函数(UDAF)、自定义标量函数(UDSF)和自定义表值函数(UDTF)。Python自定义函数的调试和调优方法,请参见概述。Python自定义函数的开发和使用demo,请参见自定义聚合函数(UDAF)、自定义标量函数(UDSF)和自定义表值函数(UDTF)。