Skip to main content

UDF 如何使用

Beta 測試

UDF 目前正在 Beta 測試中,將會在 2022 Q1 正式上線

CannerFlow 提供 UDF 的框架,讓使用者可以用 Java 撰寫自己的商業邏輯,並使用在 SQL 中,以下介紹 Scalar Function 以及 Aggregation Function.

Scalar Function

UDF 在撰寫時,使用 annotations 去表示函式的相關資訊,包括 name, description, return type 以及 parameter types,以下提供 varchar 做 lowercase 的簡單範例 lowercaser(VARCHAR)

public class ExampleStringFunction
{
@ScalarFunction("lowercaser")
@Description("Converts the string to alternating case")
@SqlType(StandardTypes.VARCHAR)
public static Slice lowercaser(@SqlType(StandardTypes.VARCHAR) Slice slice)
{
String argument = slice.toStringUtf8();
return Slices.utf8Slice(argument.toLowerCase());
}
}

發佈到 CannerFlow 上後,就可在 SQL 中做使用

-- users data
-- | firstname |
-- | David |

select lowercaser(firstname) from users;

-- result
-- | firstname |
-- | david |

Aggregation function

Aggregation functions 使用類似於 scalar function 的框架,但多出了對於狀態的管理,CannerFlow 的 UDF 框架中定義了 AccumulatorState 作為狀態的累積所需

以下舉例 avg_double ,實作了 DOUBLE 型別的 average

@AggregationFunction("avg_double")
public class AverageAggregation
{
@InputFunction
public static void input(
LongAndDoubleState state,
@SqlType(StandardTypes.DOUBLE) double value)
{
state.setLong(state.getLong() + 1);
state.setDouble(state.getDouble() + value);
}

@CombineFunction
public static void combine(
LongAndDoubleState state,
LongAndDoubleState otherState)
{
state.setLong(state.getLong() + otherState.getLong());
state.setDouble(state.getDouble() + otherState.getDouble());
}

@OutputFunction(StandardTypes.DOUBLE)
public static void output(LongAndDoubleState state, BlockBuilder out)
{
long count = state.getLong();
if (count == 0) {
out.appendNull();
}
else {
double value = state.getDouble();
DOUBLE.writeDouble(out, value / count);
}
}
}

這個例子中使用的 LongAndDoubleState 繼承了 AccumulatorState ,並很簡單的實踐了 getter 跟 setter,讓狀態可以被存取

public interface LongAndDoubleState
extends AccumulatorState
{
long getLong();

void setLong(long value);

double getDouble();

void setDouble(double value);
}

Aggregate Function 在 CannerFlow 的 SQL Engine 中,會透過呼叫定義的 method,在 MPP 的分散式運算架構下執行定義的商業邏輯.

我們可以更深入地看所需撰寫的三個 method

  • @InputFunction: @InputFunction annotation 定義了 input rows 會怎麼儲存在 AccumulatorState 中,累積其狀態
  • @CombineFunction: @CombineFunction annotation 定義了在分散式運算架構下,要怎麼把狀態合併 (Combine)
  • @OutputFunction: @OutputFunction 是最後運算完成後,輸出會執行的 method

運作上會是@InputFunction 在不同的 worker 上分散式執行,結果傳輸到不同 worker 上執行 @CombineFunction 把狀態合併,最後透過 @OutputFunction 輸出其運算結果

Aggregation Function 上傳至 CannerFlow 後可使用在 SQL 中,可以使用在 select 中,也可以搭配 group by 使用

-- users data
-- | age | country |
-- | 10 | TW |
-- | 15 | TW |
-- | 20 | US |
-- | 30 | US |

select avg_double(age) from users;
-- result
-- | avg_double |
-- | 37.5 |

select country, avg_double(age) from users group by country;
-- result
-- | age | country |
-- | 12.5 | TW |
-- | 25 | US |