撰寫自定義函數 (UDF)
Canner Enterprise 提供 UDF 的框架,讓使用者可以用 Java 撰寫自己的商業邏輯,並使用在 SQL 中,以下介紹 Scalar Function
以及 Aggregation Function
。
Scalar Function
UDF 在撰寫時,使用 annotations 去表示函式的相關資訊,包括 name, description, return type 以及 parameter types,以下提供 varchar 做 lowercase 的簡單範例 lowercaser(VARCHAR)
public class ExampleStringFunction
{
@ScalarFunction("lowercaser")
@Description("Converts the string to alternating case")
@SqlType(StandardTypes.VARCHAR)
public static Slice lowercaser(@SqlType(StandardTypes.VARCHAR) Slice slice)
{
String argument = slice.toStringUtf8();
return Slices.utf8Slice(argument.toLowerCase());
}
}
發佈到 Canner Enterprise 上後,就可在 SQL 中做使用
-- users data
-- | firstname |
-- | David |
select lowercaser(firstname) from users;
-- result
-- | firstname |
-- | david |
Aggregation function
Aggregation functions 使用類似於 scalar function 的框架,但多出了對於狀態的管理,Canner Enterprise 的 UDF 框架中定義了 AccumulatorState
作為狀態的累積所需
以下舉例 avg_double
,實作了 DOUBLE
型別的 average
@AggregationFunction("avg_double")
public class AverageAggregation
{
@InputFunction
public static void input(
LongAndDoubleState state,
@SqlType(StandardTypes.DOUBLE) double value)
{
state.setLong(state.getLong() + 1);
state.setDouble(state.getDouble() + value);
}
@CombineFunction
public static void combine(
LongAndDoubleState state,
LongAndDoubleState otherState)
{
state.setLong(state.getLong() + otherState.getLong());
state.setDouble(state.getDouble() + otherState.getDouble());
}
@OutputFunction(StandardTypes.DOUBLE)
public static void output(LongAndDoubleState state, BlockBuilder out)
{
long count = state.getLong();
if (count == 0) {
out.appendNull();
}
else {
double value = state.getDouble();
DOUBLE.writeDouble(out, value / count);
}
}
}
這個例子中使用的 LongAndDoubleState
繼承了 AccumulatorState
,並很簡單的實踐了 getter 跟 setter,讓狀態可以被存取
public interface LongAndDoubleState
extends AccumulatorState
{
long getLong();
void setLong(long value);
double getDouble();
void setDouble(double value);
}
Aggregate Function 在 Canner Enterprise 的 SQL Engine 中,會透過呼叫定義的 method,在 MPP 的分散式運算架構下執行定義的商業邏輯.
我們可以更深入地看所需撰寫的三個 method
@InputFunction
:@InputFunction
annotation 定義了 input rows 會怎麼儲存在AccumulatorState
中,累積其狀態@CombineFunction
:@CombineFunction
annotation 定義了在分散式運算架構下,要怎麼把狀態合併 (Combine)@OutputFunction
:@OutputFunction
是最後運算完成後,輸出會執行的 method
運作上會是@InputFunction
在不同的 worker 上分散式執行,結果傳輸到不同 worker 上執行 @CombineFunction
把狀態合併,最後透過 @OutputFunction
輸出其運算結果
Aggregation Function 上傳至 Canner Enterprise 後可使用在 SQL 中,可以使用在 select 中,也可以搭配 group by 使用
-- users data
-- | age | country |
-- | 10 | TW |
-- | 15 | TW |
-- | 20 | US |
-- | 30 | US |
select avg_double(age) from users;
-- result
-- | avg_double |
-- | 37.5 |
select country, avg_double(age) from users group by country;
-- result
-- | age | country |
-- | 12.5 | TW |
-- | 25 | US |