Apache Spark Structured Streaming
In this blog post, we're going to explore features of Apache Spark Structured Streaming by building an application that read structured data from Kafka and then streaming it into PostgresSQL.
This blog post is the next session of the previous Getting Started With Apache Spark, hence please visit that post for basic setup as we will not repeat them here.
Note that we will use spark-operator
to deploy our Spark application, hence please make sure you installed the spark-operator accordingly. If you have not done it yet, take a look at it here
Please notice that this blog post's purpose is to provide steps for those who want to try with Spark streaming, it's not a comprehensive tutorial. If you expect a comprehensive tutorial please visit Apache Spark guideline.
Prerequisite #
Install PostGres #
Install PostGres using bitnami helm chart:
helm install postgresql oci://registry-1.docker.io/bitnamicharts/postgresql
Connect to PostGres
export POSTGRES_PASSWORD=$(kubectl get secret --namespace default postgresql -o jsonpath="{.data.postgres-password}" | base64 -d)
kubectl run postgresql-client --rm --tty -i --restart='Never' --namespace default --image docker.io/bitnami/postgresql:16.2.0-debian-12-r6 --env="PGPASSWORD=$POSTGRES_PASSWORD" \
--command -- psql --host postgresql -U postgres -d postgres -p 5432;
Create database and user:
CREATE USER spark WITH ENCRYPTED PASSWORD 'spark123';
CREATE DATABASE spark WITH OWNER spark;
\connect spark;
\dt
Create secret to store user/pass:
kubectl create secret generic db-user-pass \
--from-literal=username=spark \
--from-literal=password='spark123'
Build the application #
Main.scala
import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Dataset, Encoders, SaveMode, SparkSession}
import java.sql.Timestamp
object Main {
def main(args: Array[String]): Unit = {
val spark:SparkSession = SparkSession.builder()
.appName("hello-spark-streaming")
.getOrCreate()
val kafkaUser = sys.env.getOrElse("KAFKA_USER", "user1")
val kafkaPass = sys.env.getOrElse("KAFKA_PASS", "")
val kafkaOpts = Map[String, String](
"kafka.bootstrap.servers"-> "kafka:9092",
"subscribe"-> "test",
"kafka.sasl.mechanism"-> "PLAIN",
"kafka.security.protocol" -> "SASL_PLAINTEXT",
"kafka.sasl.jaas.config"-> s"""org.apache.kafka.common.security.plain.PlainLoginModule required username="$kafkaUser" password="$kafkaPass";""",
)
import spark.implicits._
val df = spark
.readStream
.format("kafka")
.options(kafkaOpts)
.load()
.selectExpr("CAST(value as STRING) as value")
.select(from_json(col("value"),Event.schema).as[Event])
val dbName = sys.env.getOrElse("POSTGRESQL_DBNAME","spark")
val dbUser = sys.env.getOrElse("POSTGRESQL_USER", "spark")
val dbPass = sys.env.getOrElse("POSTGRESQL_PASS", "")
val postGreOpts = Map[String,String](
"url" -> s"jdbc:postgresql://postgresql:5432/$dbName",
"driver" -> "org.postgresql.Driver",
"user" -> dbUser,
"password" -> dbPass,
"dbtable" -> "events",
)
df.writeStream
.foreachBatch((batch :Dataset[Event], _: Long) => {
batch.write
.format("jdbc")
.options(postGreOpts)
.mode(SaveMode.Overwrite)
.save()
})
.start()
.awaitTermination()
}
}
case class Event(uuid: String, typ: String, data: String, ts: Timestamp)
object Event {
val schema: StructType = Encoders.product[Event].schema
}
build.sbt
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / scalaVersion := "2.12.18"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.1"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.1"
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.5.1" % Test
libraryDependencies += "org.postgresql" % "postgresql" % "42.7.2"
Compile / run / mainClass := Some("Main")
lazy val root = (project in file("."))
.settings(
name := "hello-spark"
)
Dockerfile
ARG SCALA_VERSION=2.12
ARG ARG_JAR_NAME=hello-spark_2.12-0.1.0-SNAPSHOT.jar
ARG ARG_MAIN_CLASS=Main
FROM sbtscala/scala-sbt:graalvm-ce-22.3.3-b1-java17_1.9.9_2.12.18 as build
WORKDIR /app
COPY . .
RUN sbt package
FROM apache/spark:3.5.1-scala2.12-java17-ubuntu
ARG ARG_JAR_NAME
ARG ARG_MAIN_CLASS
ARG SCALA_VERSION
COPY --from=build /app/target/scala-${SCALA_VERSION}/${ARG_JAR_NAME} /app/work/application.jar
Build Docker image:
docker build -t hello-spark-streaming:latest .
spark-streaming-application.yaml
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: hello-spark-streaming
namespace: default
spec:
type: Scala
sparkVersion: 3.5.1
mode: cluster
image: hello-spark-streaming:latest
mainClass: Main
mainApplicationFile: local:///app/work/application.jar
deps:
packages:
- org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1
- org.postgresql:postgresql:42.7.2
sparkConf:
"spark.driver.extraJavaOptions": "-Divy.cache.dir=/tmp -Divy.home=/tmp"
driver:
memory: 512m
labels:
version: 3.5.1
serviceAccount: sparksubmit
env:
- name: KAFKA_USER
value: user1
- name: KAFKA_PASS
valueFrom:
secretKeyRef:
name: kafka-user-passwords
key: client-passwords
- name: POSTGRESQL_DBNAME
value: spark
- name: POSTGRESQL_USER
valueFrom:
secretKeyRef:
name: db-user-pass
key: username
- name: POSTGRESQL_PASS
valueFrom:
secretKeyRef:
name: db-user-pass
key: password
executor:
memory: 512m
instances: 3
labels:
version: 3.5.1
Deploy the application:
kubectl apply -f spark-streaming-application.yaml
Check status:
kubectl get pod
NAME READY STATUS RESTARTS AGE
hello-spark-streaming-86b6a38e376d2a19-exec-1 1/1 Running 0 3m33s
hello-spark-streaming-86b6a38e376d2a19-exec-2 1/1 Running 0 3m33s
hello-spark-streaming-86b6a38e376d2a19-exec-3 1/1 Running 0 3m33s
hello-spark-streaming-driver 1/1 Running 0 6m9s
kafka-client 1/1 Running 0 133m
kafka-controller-0 1/1 Running 1 (161m ago) 6h15m
kafka-controller-1 1/1 Running 1 (161m ago) 6h15m
kafka-controller-2 1/1 Running 1 (161m ago) 6h15m
postgresql-0 1/1 Running 0 84m
postgresql-client 1/1 Running 0 27m
Produce some messages:
kafka-console-producer.sh --topic test --request-required-acks all --bootstrap-server kafka:9092 --producer.config client.conf
{"uuid":"f7a66f45-a671-4bf7-97af-cb58fffdbd54", "typ": "setup", "data":"event 1", "ts":"2024-03-11 09:09:45.847317+00"}
Check data in postgres:
select * from events;