License Build Release

spark-commons is a library offering commonly needed routines, classes and functionality. It consists of three modules.

  • spark-commons-spark2.4
  • spark-commons-spark3.2
  • spark-commons-spark3.3
  • spark-commons-test

spark2-commons and spark3-commons both offer the same logic for the respective major versions of Spark addressing usual needs of Spark applications.

spark-commons-test then brings routines to help in testing Spark applications (and it's independent of Spark version used)

spark-commons-spark2.4 spark-commons-spark3.2 spark-commons-spark3.3 spark-commons-test
Scala 2.11 Maven Central Maven Central
Scala 2.12 Maven Central Maven Central Maven Central Maven Central



A trait that when is mixed with another QueryExecutionListener implementation, makes sure the later is not called with any fatal exception.

See AbsaOSS/commons#50

val myListener = new MyQueryExecutionListener with NonFatalQueryExecutionListenerAdapter


A trait that brings Spark version independent implementation of transform function.


SchemaUtils provides methods for working with schemas, its comparison and alignment.

  1. Extracts the parent path of a field. Returns an empty string if a root level column name is provided.

  2. Extracts the field name of a fully qualified column name.

  3. Get paths for all array subfields of this given datatype.

  4. For a given list of field paths determines if any path pair is a subset of one another.

  5. Append a new attribute to path or empty string.

      SchemaUtils.appendPath(path, fieldName)
  6. Separates the field name components of a fully qualified column name as their hierarchy goes from root down to the deepest one.

      SchemaUtils.splitPath(columnName, keepEmptyFields = True)


Json Utils provides methods for working with Json, both on input and output.

  1. Create a Spark DataFrame from a JSON document(s).

      JsonUtils.getDataFrameFromJson(json, schema)(implicit spark)
  2. Creates a Spark Schema from a JSON document(s).



ColumnImplicits provide implicit methods for transforming Spark Columns

  1. Transforms the column into a boolean column, checking if values are negative or positive infinity

  2. Returns column with requested substring. It shifts the substring indexation to be in accordance with Scala/ Java. The provided starting position where to start the substring from, if negative it will be counted from end

  3. Returns column with requested substring. It shifts the substring indexation to be in accordance with Scala/ Java. If the provided starting position where to start the substring from is negative, it will be counted from end. The length of the desired substring, if longer then the rest of the string, all the remaining characters are taken.

      column.zeroBasedSubstr(startPos, length)


StructFieldImplicits provides implicit methods for working with StructField objects.

Of them, metadata methods are:

  1. Gets the metadata Option[String] value given a key

  2. Gets the metadata Char value given a key if the value is a single character String, it returns the char, otherwise None

  3. Gets the metadata boolean value of a given key, given that it can be transformed into boolean

  4. Checks the structfield if it has the provided key, returns a boolean



ArrayTypeImplicits provides implicit methods for working with ArrayType objects.

  1. Checks if the arraytype is equivalent to another

  2. For an array of arrays, get the final element type at the bottom of the array



DataTypeImplicits provides implicit methods for working with DataType objects.

  1. Checks if the datatype is equivalent to another

  2. Checks if a casting between types always succeeds

  3. Checks if type is primitive



StructTypeImplicits provides implicit methods for working with StructType objects.

  1. Get a field from a text path

  2. Get a type of a field from a text path

  3. Checks if the specified path is an array of structs

  4. Get nullability of a field from a text path

  5. Checks if a field specified by a path exists

  6. Get paths for all array fields in the schema

  7. Get a closest unique column name

  8. Checks if a field is the only field in a struct

  9. Checks if 2 structtypes are equivalent

  10. Returns a list of differences in one utils to the other

      structType.diffSchema(otherSchema, parent)
  11. Checks if a field is of the specified type

  12. Checks if a field is a subset of the specified type

  13. Returns data selector that can be used to align utils of a data frame.



  1. Get first array column's path out of complete path

  2. Get all array columns' paths out of complete path.

  3. For a given list of field paths determines the deepest common array path

  4. For a field path determines the deepest array path

  5. Checks if a field is an array that is not nested in another array



  1. Changes the fields structure of the DataFrame to adhere to the provided schema or selector. Data types remain intact
  1. Persist this Dataset with the default storage level, avoiding the warning in case the cache has happened already before
  1. Get the string representation of the data in the format as]]` displays them
  1. Adds a column to a dataframe if it does not exist
  1. Casts all NullType fields of the DataFrame to their corresponding types in targetSchema.

Spark Version Guard

A class which checks if the Spark job version is compatible with the Spark Versions supported by the library

Default mode checking


Checking for 2.X versions


Checking for 3.X versions



Abstract class to help attach/register UDFs and similar object only once to a spark session.

Usage: Extend this abstract class and implement the method register. On initialization the register method gets executed only if the class + spark session combination is unique.

This way we ensure only single registration per spark session.


DataFrameImplicits provides methods for transformations on Dataframes

  1. Getting the string of the data of the dataframe in similar fashion as the show function present them.

          df.dataAsString(numRows, truncate)
          df.dataAsString(numRows, truncateNumber)
          df.dataAsString(numRows, truncate, vertical)
  2. Adds a column to a dataframe if it does not exist. If it exists, it will apply the provided function

       df.withColumnIfDoesNotExist((df: DataFrame, _) => df)(colName, colExpression)
  3. Aligns the utils of a DataFrame to the selector for operations where utils order might be important (e.g. hashing the whole rows and using except)



  1. Similarly to col function evaluates the column based on the provided column name. But here, it can be a full path even of nested fields. It also evaluates arrays and maps where the array index or map key is in brackets [].

        def col_of_path(fullColName: String): Column
  2. Provides a column of NULL values.

        def nul_coll(): Column
  3. Provides a column of NULL values, but the actual type is per specification

        def nul_coll(dataType: DataType): Column

Error Handler

A trait and a set of supporting classes and other traits to enable errors channeling between libraries and application during Spark data processing.

  1. It has an implicit dataFrame for easier usage of the methods provided by the error handler trait.

  2. It provides four basic implementations

    • ErrorHandlerErrorMessageIntoArray - An implementation of error handler trait that collects errors into columns of struct based on [ ErrorMessage] case class.
    • ErrorHandlerFilteringErrorRows - An implementation of error handler that implements the functionality of filtering rows that have some error (any of the error columns is not NULL).
    • ErrorHandlerIgnoringErrors - An implementation of error handler trait that ignores the errors detected during the dataFrame error aggregation
    • ErrorHandlerThrowingException - An implementation of error handler trait that throws an exception on error detected.

Spark Commons Test


class MyTest extends SparkTestBase {

By default, it will instantiate a local Spark. There is also the possibility to use it in yarn mode:

class MyTest extends SparkTestBase {
override lazy val spark: SparkSession = initSpark(new YarnSparkConfiguration(confDir, distJarsDir))

How to generate Code coverage report

sbt jacoco

Code coverage will be generated on path:


How to Release

Please see this file for more details.