Scala Spark Client
支援環境
目前 Spark 整合僅在 On-Prem Cluster Mode 及 AWS 環境下支援
Installation
你可以從下方連結下載 Jar
Quick Start
Setup Project
- sbt
- maven
Folders
/canner-client-test
├── lib
│ └── canner-spark-client-assembly_<version>.jar
├── src/main/scala/test
│ └── QuickStart.scala
├── project
│ └── build.properties
└── build.sbt
請確保下載的 Jar 放置到 /lib
下
/build.sbt
name := "canner-client-test"
version := "0.1"
scalaVersion := "2.12.12"
mainClass in (Compile, run) := Some("test.QuickStart")
libraryDependencies += "com.canner" % "canner-spark-client" % "1.0.0" from "file:///<path-to-client-test>/canner-client-test/lib"
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.5"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.5"
Folders
/canner-client-test
├── lib
│ └── canner-spark-client-assembly_<version>.jar
├── src/main/scala/test
│ └── QuickStart.scala
└── pom.xml
請確保下載的 Jar 放置到 /lib
下
創建 local repo
您可使用 Maven 建置 Local Repository 存放 Canner Enterprise 的 Jar
1. 建立資料夾
mkdir local-repo
2. 使用 maven cmd 部署 jar
mvn deploy:deploy-file -DgroupId=com.canner -DartifactId=canner-spark-client -Dversion=1.0.0 -Durl=file:./local-repo/ -DrepositoryId=local-repo -DupdateReleaseInfo=true -Dfile=${project-root}/lib/canner-spark-client-assembly_2.12-1.0.0.jar -Dpackaging=jar
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.canner</groupId>
<artifactId>test</artifactId>
<version>1</version>
<properties>
<scala.mainVersion>2.12</scala.mainVersion>
<scala.version>${scala.mainVersion}.12</scala.version>
<scala.compat.version>2.12</scala.compat.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.mainVersion}</artifactId>
<version>2.4.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.mainVersion}</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>com.canner</groupId>
<artifactId>canner-spark-client</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>local-repo</id>
<url>file:///${project.basedir}/local-repo</url>
</repository>
</repositories>
</project>
Count 範例程式
如果您使用 Canner Enterprise distributed mode on AWS, 您需創建一個 AWS User,其權限需可存取 Canner Enterprise 的 S3 Bucket,再將此使用者的 <AWS_ACCESS_KEY_ID>
, <AWS_SECRET_ACCESS_KEY>
放到程式碼中使用
如果您使用 Standalone mode. 請跟管理員拿取 <AWS_ACCESS_KEY_ID>
, <AWS_SECRET_ACCESS_KEY>
/src/main/scala/test/QuickStart.scala
package test
import org.apache.spark.sql.SparkSession
import com.canner.Canner
object QuickStart {
val TOKEN = "<CANNER_PERSONAL_ACCESS_TOKEN>"
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
// if you're using standalone, please specify the endpoint to object storage url.
// if you're using ssl and encounter certification issue, please add `-Dcom.amazonaws.sdk.disableCertChecking=true` to avoid this problem
//.config("spark.hadoop.fs.s3a.endpoint", "https://<my-endpoint-name>:9000")
.config("spark.hadoop.fs.s3a.access.key", "<AWS_ACCESS_KEY_ID>")
.config("spark.hadoop.fs.s3a.secret.key", "<AWS_SECRET_ACCESS_KEY>")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem").master("local[*]").getOrCreate()
// entrypoint is the hostname of Canner
val entrypoint = "https://test.canner.your.host.com"
val workspace = "<workspace ID>"
val canner = Canner(entrypoint, TOKEN, workspace, spark)
// Executing a sql query on Canner and getting a `DataFrame` back
val df = canner.genQuery("select * from lineitem")
println("----------------------")
print("df count: " + df.count())
println("----------------------")
}
}