跳至主要内容
版本:v3

Scala Spark Client

支援環境

目前 Spark 整合僅在 On-Prem Cluster Mode 及 AWS 環境下支援

Installation

你可以從下方連結下載 Jar

Quick Start

Setup Project

Folders

/canner-client-test
├── lib
│ └── canner-spark-client-assembly_<version>.jar
├── src/main/scala/test
│ └── QuickStart.scala
├── project
│ └── build.properties
└── build.sbt

請確保下載的 Jar 放置到 /lib

/build.sbt
name := "canner-client-test"

version := "0.1"

scalaVersion := "2.12.12"

mainClass in (Compile, run) := Some("test.QuickStart")

libraryDependencies += "com.canner" % "canner-spark-client" % "1.0.0" from "file:///<path-to-client-test>/canner-client-test/lib"

// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.5"

// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.5"

Count 範例程式

如果您使用 Canner Enterprise distributed mode on AWS, 您需創建一個 AWS User,其權限需可存取 Canner Enterprise 的 S3 Bucket,再將此使用者的 <AWS_ACCESS_KEY_ID>, <AWS_SECRET_ACCESS_KEY> 放到程式碼中使用

如果您使用 Standalone mode. 請跟管理員拿取 <AWS_ACCESS_KEY_ID>, <AWS_SECRET_ACCESS_KEY>

/src/main/scala/test/QuickStart.scala
package test

import org.apache.spark.sql.SparkSession
import com.canner.Canner

object QuickStart {
val TOKEN = "<CANNER_PERSONAL_ACCESS_TOKEN>"

def main(args: Array[String]): Unit = {

val spark = SparkSession.builder()
// if you're using standalone, please specify the endpoint to object storage url.
// if you're using ssl and encounter certification issue, please add `-Dcom.amazonaws.sdk.disableCertChecking=true` to avoid this problem
//.config("spark.hadoop.fs.s3a.endpoint", "https://<my-endpoint-name>:9000")
.config("spark.hadoop.fs.s3a.access.key", "<AWS_ACCESS_KEY_ID>")
.config("spark.hadoop.fs.s3a.secret.key", "<AWS_SECRET_ACCESS_KEY>")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem").master("local[*]").getOrCreate()

// entrypoint is the hostname of Canner
val entrypoint = "https://test.canner.your.host.com"
val workspace = "<workspace ID>"

val canner = Canner(entrypoint, TOKEN, workspace, spark)

// Executing a sql query on Canner and getting a `DataFrame` back
val df = canner.genQuery("select * from lineitem")

println("----------------------")
print("df count: " + df.count())
println("----------------------")

}
}