跳至主要内容

上傳並管理自定義函數

Canner 提供 UDF 的框架,讓使用者可以用 Java 撰寫自己的商業邏輯,並使用在 SQL 中,以下介紹 Scalar Function 以及 Aggregation Function.

Scalar Function

UDF 在撰寫時,使用 annotations 去表示函式的相關資訊,包括 name, description, return type 以及 parameter types,以下提供 varchar 做 lowercase 的簡單範例 lowercaser(VARCHAR)

public class ExampleStringFunction
{
@ScalarFunction("lowercaser")
@Description("Converts the string to alternating case")
@SqlType(StandardTypes.VARCHAR)
public static Slice lowercaser(@SqlType(StandardTypes.VARCHAR) Slice slice)
{
String argument = slice.toStringUtf8();
return Slices.utf8Slice(argument.toLowerCase());
}
}

發佈到 Canner 上後,就可在 SQL 中做使用

-- users data
-- | firstname |
-- | David |

select lowercaser(firstname) from users;

-- result
-- | firstname |
-- | david |

Aggregation function

Aggregation functions 使用類似於 scalar function 的框架,但多出了對於狀態的管理,Canner 的 UDF 框架中定義了 AccumulatorState 作為狀態的累積所需

以下舉例 avg_double ,實作了 DOUBLE 型別的 average

@AggregationFunction("avg_double")
public class AverageAggregation
{
@InputFunction
public static void input(
LongAndDoubleState state,
@SqlType(StandardTypes.DOUBLE) double value)
{
state.setLong(state.getLong() + 1);
state.setDouble(state.getDouble() + value);
}

@CombineFunction
public static void combine(
LongAndDoubleState state,
LongAndDoubleState otherState)
{
state.setLong(state.getLong() + otherState.getLong());
state.setDouble(state.getDouble() + otherState.getDouble());
}

@OutputFunction(StandardTypes.DOUBLE)
public static void output(LongAndDoubleState state, BlockBuilder out)
{
long count = state.getLong();
if (count == 0) {
out.appendNull();
}
else {
double value = state.getDouble();
DOUBLE.writeDouble(out, value / count);
}
}
}

這個例子中使用的 LongAndDoubleState 繼承了 AccumulatorState ,並很簡單的實踐了 getter 跟 setter,讓狀態可以被存取

public interface LongAndDoubleState
extends AccumulatorState
{
long getLong();

void setLong(long value);

double getDouble();

void setDouble(double value);
}

Aggregate Function 在 Canner 的 SQL Engine 中,會透過呼叫定義的 method,在 MPP 的分散式運算架構下執行定義的商業邏輯.

我們可以更深入地看所需撰寫的三個 method

  • @InputFunction: @InputFunction annotation 定義了 input rows 會怎麼儲存在 AccumulatorState 中,累積其狀態
  • @CombineFunction: @CombineFunction annotation 定義了在分散式運算架構下,要怎麼把狀態合併 (Combine)
  • @OutputFunction: @OutputFunction 是最後運算完成後,輸出會執行的 method

運作上會是@InputFunction 在不同的 worker 上分散式執行,結果傳輸到不同 worker 上執行 @CombineFunction 把狀態合併,最後透過 @OutputFunction 輸出其運算結果

Aggregation Function 上傳至 Canner 後可使用在 SQL 中,可以使用在 select 中,也可以搭配 group by 使用

-- users data
-- | age | country |
-- | 10 | TW |
-- | 15 | TW |
-- | 20 | US |
-- | 30 | US |

select avg_double(age) from users;
-- result
-- | avg_double |
-- | 37.5 |

select country, avg_double(age) from users group by country;
-- result
-- | age | country |
-- | 12.5 | TW |
-- | 25 | US |