cerndb / sparkplugins   0.3

Apache License 2.0 GitHub

Code and examples of how to write and deploy Apache Spark Plugins. Spark plugins allow runnig custom code on the executors as they are initialized. This also allows extending the Spark metrics systems with user-provided monitoring probes.

Scala versions: 2.13 2.12

SparkPlugins

SparkPlugins CI Maven Central

Use Spark Plugins to extend Apache Spark with custom metrics and executors' startup actions.

Key Features

  • Spark Plugins are a mechanism to extend Apache Spark with custom code for metrics and actions.
  • This repository provides examples of plugins that you can deploy to extend Spark with custom metrics and actions.
    • Extending Spark instrumentation with custom metrics
    • Running custom actions when the executors start up, typically useful for integrating with external systems, such as monitoring systems.
    • This repo provides code and examples of plugins applied to measuring Spark on cluster resources (YARN, K8S, Standalone), including measuring Spark I/O from cloud Filesystems, OS metrics, custom application metrics, and integrations with external systems like Pyroscope.
  • The code in this repo is for Spark 3.x. For Spark 2.x, see instead Executor Plugins for Spark 2.4

Contents

Resources

Author and contact: [email protected]

Implementation Notes:

  • Spark plugins implement the org.apache.spark.api.Plugin interface, they can be written in Scala or Java and can be used to run custom code at the startup of Spark executors and driver.
  • Plugins basic configuration: --conf spark.plugins=<list of plugin classes>
  • Plugin JARs need to be made available to Spark executors
    • you can distribute the plugin code to the executors using --jars and --packages
    • for K8S you can also consider making the jars available directly in the container image
  • Most of the Plugins described in this repo are intended to extend the Spark Metrics System
    • See the details on the Spark metrics system at Spark Monitoring documentation.
    • You can find the metrics generated by the plugins in the Spark metrics system stream under the namespace namespace=plugin.<Plugin Class Name>
  • See also: SPARK-29397, SPARK-28091, SPARK-32119.

Getting Started - Your First Spark Plugins

  • Deploy the code of the Spark plugins described here using from maven central
    • --packages ch.cern.sparkmeasure:spark-plugins_2.12:0.3
  • Build or download the SparkPlugin jar. For example:
    • Build from source with sbt +package
    • Or download the jar from the automatic build in github actions

Demo and Basic Plugins

  • DemoPlugin
    • --packages ch.cern.sparkmeasure:spark-plugins_2.12:0.3 --conf spark.plugins=ch.cern.DemoPlugin
    • Basic plugin, demonstrates how to write Spark plugins in Scala, for demo and testing.
  • DemoMetricsPlugin
    • --packages ch.cern.sparkmeasure:spark-plugins_2.12:0.3 --conf spark.plugins=ch.cern.DemoMetricsPlugin
    • Example plugin illustrating integration with the Spark metrics system.
    • Metrics implemented:
      • ch.cern.DemoMetricsPlugin.DriverTest42: a gauge reporting a constant integer value, for testing.
  • RunOSCommandPlugin
    • --conf spark.plugins=ch.cern.RunOSCommandPlugin
    • Example illustrating how to use plugins to run actions on the OS.
    • Action implemented: runs an OS command on the executors, by default it runs: /usr/bin/touch /tmp/plugin.txt
    • Configurable action: --conf spark.cernSparkPlugin.command="command or script you want to run"
    • Example:
      bin/spark-shell --master yarn \ 
        --packages ch.cern.sparkmeasure:spark-plugins_2.12:0.3 \
        --conf spark.plugins=ch.cern.RunOSCommandPlugin 
      
      • You can see if the plugin has run by checking that the file /tmp/plugin.txt has been created on the executor machines.

Plugins in this Repository

Plugin for integrating with Pyroscope

Grafana Pyroscope is a tool for continuous profiling and Flame Graph visualization. This plugin allows to integrate Apache Spark and Pyroscope. For details see:
How to profile Spark with Pyroscope

An example of how to put all the configuration together and start Spark on a cluster with Pyroscope Flame Graph continuous monitoring. Example:

  1. Start Pyroscope
  1. Spark Spark (spark-shell, PySpark, spark-submit
bin/spark-shell --master yarn  \
  --packages ch.cern.sparkmeasure:spark-plugins_2.12:0.3,io.pyroscope:agent:0.13.0 \ # update to use the latest versions
  --conf spark.plugins=ch.cern.PyroscopePlugin \
  --conf spark.pyroscope.server="http://<myhostname>:5040" # match with the server and port used when starting Pyroscope

Spark configurations:
This plugin adds the following configurations:

  --conf spark.pyroscope.server - > default "http://localhost:4040", update to match the server name and port used by Pyroscope
  --conf spark.pyroscope.applicationName -> default spark.conf.get("spark.app.id")
  --conf spark.pyroscope.eventType -> default ITIMER, possible values ITIMER, CPU, WALL, ALLOC, LOCK

Example:
This is an example of how to use the configuration programmatically (using PySpark):

from pyspark.sql import SparkSession

# Get the Spark session
spark = (SparkSession.builder.
      appName("Instrumented app").master("yarn")
      .config("spark.executor.memory","16g")
      .config("spark.executor.cores","4")
      .config("spark.executor.instances", 2)
      .config("spark.jars.packages", "ch.cern.sparkmeasure:spark-plugins_2.12:0.3,io.pyroscope:agent:0.13.0")
      .config("spark.plugins", "ch.cern.PyroscopePlugin")
      .config("spark.pyroscope.server", "http://<myhostname>:5040")
      .getOrCreate()
    )

OS metrics instrumentation with cgroups, for Spark on Kubernetes

  • CgroupMetrics

    • Configure with: --conf spark.plugins=ch.cern.CgroupMetrics

    • Optional configuration: --conf spark.cernSparkPlugin.registerOnDriver (default false)

    • Implemented using cgroup instrumentation of key system resource usage, intended mostly for Spark on Kubernetes

    • Collects metrics using CGroup stats from /sys/fs and from /proc filesystem for CPU, Memory and Network usage. See also kernel documentation Note: the metrics are reported for the entire cgroup to which the executor belongs to. This is mostly intended for Spark running on Kubernetes. In other cases, the metrics reported may not be easily correlated with executor's activity, as the cgroup metrics may include more processes, up to the entire system.

    • Metrics implemented (gauges), with prefix ch.cern.CgroupMetrics:

      • CPUTimeNanosec: reports the CPU time used by the processes in the cgroup.
      • MemoryRss: number of bytes of anonymous and swap cache memory.
      • MemorySwap: number of bytes of swap usage.
      • MemoryCache: number of bytes of page cache memory.
      • NetworkBytesIn: network traffic inbound.
      • NetworkBytesOut: network traffic outbound.
    • Example:

    bin/spark-shell --master k8s://https://<K8S URL>:6443 --driver-memory 1g \ 
      --num-executors 2 --executor-cores 2 --executor-memory 2g \
      --conf spark.kubernetes.container.image=<registry>/spark:v330 \
      --packages ch.cern.sparkmeasure:spark-plugins_2.12:0.3 \
      --conf spark.plugins=ch.cern.HDFSMetrics,ch.cern.CgroupMetrics \
      --conf "spark.metrics.conf.*.sink.graphite.class"="org.apache.spark.metrics.sink.GraphiteSink"   \
      --conf "spark.metrics.conf.*.sink.graphite.host"=mytestinstance \
      --conf "spark.metrics.conf.*.sink.graphite.port"=2003 \
      --conf "spark.metrics.conf.*.sink.graphite.period"=10 \
      --conf "spark.metrics.conf.*.sink.graphite.unit"=seconds \
      --conf "spark.metrics.conf.*.sink.graphite.prefix"="youridhere"
    
  • Visualize the metrics using the Spark dashboard, see Spark_Perf_Dashboard_v03_with_SparkPlugins


Plugins to collect I/O storage statistics for HDFS and Hadoop Compatible Filesystems

HDFS extended storage statistics

This Plugin measures HDFS extended statistics. In particular, it provides information on read locality and erasure coding usage (for HDFS 3.x).

  • HDFSMetrics

    • Configure with: --conf spark.plugins=ch.cern.HDFSMetrics

    • Optional configuration: --conf spark.cernSparkPlugin.registerOnDriver (default true)

    • Collects extended HDFS metrics using Hadoop's GlobalStorageStatistics implemented using Hadoop extended statistics metrics introduced in Hadoop 2.8.

    • Use this with Spark built with Hadoop 3.2 or higher (it does not work with Spark built with Hadoop 2.7).

    • Metrics (gauges) implemented have the prefix ch.cern.HDFSMetrics. List of metrics:

      • bytesRead
      • bytesWritten
      • readOps
      • writeOps
      • largeReadOps
      • bytesReadLocalHost
      • bytesReadDistanceOfOneOrTwo
      • bytesReadDistanceOfThreeOrFour
      • bytesReadDistanceOfFiveOrLarger
      • bytesReadErasureCoded
    • Example

    bin/spark-shell --master yarn \
      --packages ch.cern.sparkmeasure:spark-plugins_2.12:0.3 \
      --conf spark.plugins=ch.cern.HDFSMetrics \
      --conf "spark.metrics.conf.*.sink.graphite.class"="org.apache.spark.metrics.sink.GraphiteSink"   \
      --conf "spark.metrics.conf.*.sink.graphite.host"=mytestinstance \
      --conf "spark.metrics.conf.*.sink.graphite.port"=2003 \
      --conf "spark.metrics.conf.*.sink.graphite.period"=10 \
      --conf "spark.metrics.conf.*.sink.graphite.unit"=seconds \
      --conf "spark.metrics.conf.*.sink.graphite.prefix"="youridhere"
    
  • Visualize the metrics using the Spark dashboard, see Spark_Perf_Dashboard_v03_with_SparkPlugins


Cloud filesystem storage statistics for Hadoop Compatible Filesystems

This Plugin provides I/O statistics for Cloud Filesystem metrics (for s3a, gs, wasbs, oci, root, and any other storage system exposed as a Hadoop Compatible Filesystem).

  • Configure with:
    • --conf spark.plugins=ch.cern.CloudFSMetrics

    • --conf spark.cernSparkPlugin.cloudFsName=<name of the filesystem> (example: "s3a", "gs", "wasbs", "root", "oci", etc.)

    • Optional configuration: --conf spark.cernSparkPlugin.registerOnDriver (default true)

    • Collects I/O metrics for Hadoop-compatible filesystems using Hadoop's GlobalStorageStatistics API.

      • Note: use this with Spark built with Hadoop 3.x (requires Hadoop client version 2.8 or higher).
      • Spark also allows to measure filesystem metrics using --conf spark.executor.metrics.fileSystemSchemes=<filesystems to measure> (default: file,hdfs) however in Spark (up to 3.1) this is done using Hadoop Filesystem getAllStatistics, deprecated in recent versions of Hadoop.
    • Metrics (gauges) implemented have the prefix ch.cern.S3AMetricsGSS. List of metrics:

      • bytesRead
      • bytesWritten
      • readOps
      • writeOps
    • Example:

      bin/spark-shell --master k8s://https://<K8S URL>:6443 --driver-memory 1g \ 
       --num-executors 2 --executor-cores 2 --executor-memory 2g \
       --conf spark.kubernetes.container.image=<registry>/spark:v311 \
       --packages org.apache.hadoop:hadoop-aws:3.3.2,ch.cern.sparkmeasure:spark-plugins_2.12:0.3 \
       --conf spark.plugins=ch.cern.CloudFSMetrics,ch.cern.CgroupMetrics \
       --conf spark.cernSparkPlugin.cloudFsName="s3a" \
       --conf spark.hadoop.fs.s3a.secret.key="<SECRET KEY HERE>" \
       --conf spark.hadoop.fs.s3a.access.key="<ACCESS KEY HERE>" \
       --conf spark.hadoop.fs.s3a.endpoint="https://<S3A URL HERE>" \
       --conf spark.hadoop.fs.s3a.impl="org.apache.hadoop.fs.s3a.S3AFileSystem" \
       --conf "spark.metrics.conf.*.sink.graphite.class"="org.apache.spark.metrics.sink.GraphiteSink"   \
       --conf "spark.metrics.conf.*.sink.graphite.host"=mytestinstance \
       --conf "spark.metrics.conf.*.sink.graphite.port"=2003 \
       --conf "spark.metrics.conf.*.sink.graphite.period"=10 \
       --conf "spark.metrics.conf.*.sink.graphite.unit"=seconds \
       --conf "spark.metrics.conf.*.sink.graphite.prefix"="youridhere"
      
    • Visualize the metrics using the Spark dashboard, see Spark_Perf_Dashboard_v03_with_SparkPlugins

This Plugin provides I/O statistics for Cloud Filesystem metrics (for s3a, gs, wasbs, oci, root, and any other storage system exposed as a Hadoop Compatible Filesystem). Use this for Spark built using Hadoop 2.7.

  • Configure with:
    • --conf spark.plugins=ch.cern.CloudFSMetrics27
    • --conf spark.cernSparkPlugin.cloudFsName=<name of the filesystem> (example: "s3a", "oci", "gs", "root", etc.)
    • Optional configuration: --conf spark.cernSparkPlugin.registerOnDriver (default true)
    • Collects I/O metrics for Hadoop-compatible filesystem using Hadoop 2.7 API, use with Spark built with Hadoop 2.7
    • The metrics are the same as for CloudFSMetrics described above, with except for the prefix: ch.cern.CloudFSMetrics27.

Experimental Plugins for I/O Time Instrumentation

This section details a few experimental Spark plugins used to expose metrics for I/O-time instrumentation of Spark workloads using Hadoop-compliant filesystems.
These plugins use instrumented experimental/custom versions of the Hadoop client API for HDFS and other Hadoop-Compliant File Systems.

  • S3A Time Instrumentation

    • Instruments the Hadoop S3A client.

    • Note: this requires custom S3A client implementation, see experimental code at: HDFS and S3A custom instrumentation

    • Spark config:

      • Use this with Spark 3.1.x (which uses hadoop version 3.2.0)
      • --conf spark.plugins=ch.cern.experimental.S3ATimeInstrumentation
      • Custom jar needed: --jars hadoop-aws-3.2.0.jar
    • Metrics implemented (gauges), with prefix ch.cern.experimental.S3ATimeInstrumentation:

      • S3AReadTimeMuSec
      • S3ASeekTimeMuSec
      • S3ACPUTimeDuringReadMuSec
      • S3ACPUTimeDuringSeekMuSec
      • S3AReadTimeMinusCPUMuSec
      • S3ASeekTimeMinusCPUMuSec
      • S3ABytesRead
      • S3AGetObjectMetadataMuSec
      • S3AGetObjectMetadataMinusCPUMuSec
    • Example:

      bin/spark-shell --master k8s://https://<K8S URL>:6443 --driver-memory 1g \ 
       --num-executors 2 --executor-cores 2 --executor-memory 2g \
       --conf spark.kubernetes.container.image=<registry>/spark:v311 \
       --jars <PATH>/hadoop-aws-3.2.0.jar
       --packages com.amazonaws:aws-java-sdk-bundle:1.11.880,ch.cern.sparkmeasure:spark-plugins_2.12:0.1 \
       --conf spark.hadoop.fs.s3a.secret.key="<SECRET KEY HERE>" \
       --conf spark.hadoop.fs.s3a.access.key="<ACCESS KEY HERE>" \
       --conf spark.hadoop.fs.s3a.endpoint="https://<URL HERE>" \
       --conf spark.hadoop.fs.s3a.impl="org.apache.hadoop.fs.s3a.S3AFileSystem" \
       --conf "spark.metrics.conf.*.sink.graphite.class"="org.apache.spark.metrics.sink.GraphiteSink"   \
       --conf "spark.metrics.conf.*.sink.graphite.host"=mytestinstance \
       --conf "spark.metrics.conf.*.sink.graphite.port"=2003 \
       --conf "spark.metrics.conf.*.sink.graphite.period"=10 \
       --conf "spark.metrics.conf.*.sink.graphite.unit"=seconds \
       --conf "spark.metrics.conf.*.sink.graphite.prefix"="youridhere"
      
  • Visualize the metrics using the Spark dashboard, see Spark_Perf_Dashboard_v03_with_SparkPlugins_Experimental

  • HDFS Time Instrumentation

    • Instruments the Hadoop HDFS client.

    • Note: this requires custom HDFS client implementation, see experimental code at: HDFS and S3A custom instrumentation

    • Spark config:

      • Use this with Spark 3.1.x (which uses hadoop version 3.2.0)
      • --conf spark.plugins=ch.cern.experimental.HDFSTimeInstrumentation
      • --packages ch.cern.sparkmeasure:spark-plugins_2.12:0.1
      • Non-standard configuration required for using this instrumentation:
    • Metrics implemented (gauges), with prefix ch.cern.experimental.HDFSTimeInstrumentation:

      • HDFSReadTimeMuSec
      • HDFSCPUTimeDuringReadMuSec
      • HDFSReadTimeMinusCPUMuSec
      • HDFSBytesRead
      • HDFSReadCalls
    • Example:

    bin/spark-shell --master yarn --num-executors 2 --executor-cores 2 \
     --jars <PATH>/sparkplugins_2.12-0.1.jar \
     --conf spark.plugins=ch.cern.experimental.HDFSTimeInstrumentation 
     ...NOTE: ADD here spark.metrics.conf parameters or configure metrics.conf> 
    
    • Visualize the metrics with the Spark dashboard spark_perf_dashboard_spark3-0_v02_with_sparkplugins_experimental
  • Hadoop-XRootD Time Instrumentation

    • Collects metrics for the Hadoop-XRootD connector
    • Intended use is when using Spark with the CERN EOS (and CERN Box) storage system.
    • Additional details: Hadoop-XRootD connector instrumentation
    • Spark config:
      --conf spark.plugins=ch.cern.experimental.ROOTTimeInstrumentation \
      --packages ch.cern.sparkmeasure:spark-plugins_2.12:0.1 \
      --conf spark.driver.extraClassPath=<path>/hadoop-xrootd-1.0.5.jar \
      --conf spark.executor.extraClassPath=<path>/hadoop-xrootd-1.0.5.jar \
      --files <path_to_krbtgt>#krbcache \
      --conf spark.executorEnv.KRB5CCNAME="FILE:krbcache"
      
    • Metrics implemented (gauges), with prefix ch.cern.experimental.ROOTTimeInstrumentation:
      • ROOTBytesRead
      • ROOTReadOps
      • ROOTReadTimeMuSec
  • Visualize the metrics using the Spark dashboard, see Spark_Perf_Dashboard_v03_with_SparkPlugins_Experimental

  • OCITimeInstrumentation - instruments the HDFS connector to Oracle OCI storage. - Note: this requires a custom hdfs-oci connector implementation, see experimental code at: OCI-Hadoop connector instrumentation - Spark config:

    • --conf spark.plugins=ch.cern.experimental.OCITimeInstrumentation
    • Hack required for testing using Spark on Kubernetes: copy oci-hdfs-connector-2.9.2.6.jar built from this fork into $SPARK_HOME/jars copy and install the relevant [oracle/oci-java-sdk release jars](oci-java-sdk release jars): version 1.17.5: oci-java-sdk-full-1.17.5.jar and related third party jars.
    --packages ch.cern.sparkmeasure:spark-plugins_2.12:0.1 \ # Note for K8S rather add this to the container \
    --conf spark.hadoop.fs.oci.client.auth.pemfilepath="<PATH>/oci_api_key.pem" \
    --conf spark.hadoop.fs.oci.client.auth.tenantId=ocid1.tenancy.oc1..TENNANT_KEY \
    --conf spark.hadoop.fs.oci.client.auth.userId=ocid1.user.oc1.USER_KEY \
    --conf spark.hadoop.fs.oci.client.auth.fingerprint=<fingerprint_here> \
    --conf spark.hadoop.fs.oci.client.hostname="https://objectstorage.REGION_NAME.oraclecloud.com" \
    
    • Metrics implemented (gauges), with prefix ch.cern.experimental.OCITimeInstrumentation:
      • OCIReadTimeMuSec
      • OCISeekTimeMuSec
      • OCICPUTimeDuringReadMuSec
      • OCICPUTimeDuringSeekMuSec
      • OCIReadTimeMinusCPUMuSec
      • OCISeekTimeMinusCPUMuSec
      • OCIBytesRead