spark-commons

License Build Release

spark-commons is a library offering commonly needed routines, classes and functionality. It consists of three modules.

  • spark-commons-spark2.4
  • spark-commons-spark3.2
  • spark-commons-spark3.3
  • spark-commons-test

spark2-commons and spark3-commons both offer the same logic for the respective major versions of Spark addressing usual needs of Spark applications.

spark-commons-test then brings routines to help in testing Spark applications (and it's independent of Spark version used)

spark-commons-spark2.4 spark-commons-spark3.2 spark-commons-spark3.3 spark-commons-test
Scala 2.11 Maven Central Maven Central
Scala 2.12 Maven Central Maven Central Maven Central Maven Central

Spark-Commons

NonFatalQueryExecutionListenerAdapter

A trait that when is mixed with another QueryExecutionListener implementation, makes sure the later is not called with any fatal exception.

See AbsaOSS/commons#50

val myListener = new MyQueryExecutionListener with NonFatalQueryExecutionListenerAdapter
spark.listenerManager.register(myListener)

TransformAdapter

A trait that brings Spark version independent implementation of transform function.

SchemaUtils

SchemaUtils provides methods for working with schemas, its comparison and alignment.

  1. Extracts the parent path of a field. Returns an empty string if a root level column name is provided.

      SchemaUtils.getParentPath(columnName)
  2. Extracts the field name of a fully qualified column name.

      SchemaUtils.stripParentPath(columnName)
  3. Get paths for all array subfields of this given datatype.

      SchemaUtils.getAllArraySubPaths(other)
  4. For a given list of field paths determines if any path pair is a subset of one another.

      SchemaUtils.isCommonSubPath(paths)
  5. Append a new attribute to path or empty string.

      SchemaUtils.appendPath(path, fieldName)
  6. Separates the field name components of a fully qualified column name as their hierarchy goes from root down to the deepest one.

      SchemaUtils.splitPath(columnName, keepEmptyFields = True)

JsonUtils

Json Utils provides methods for working with Json, both on input and output.

  1. Create a Spark DataFrame from a JSON document(s).

      JsonUtils.getDataFrameFromJson(json)
      JsonUtils.getDataFrameFromJson(json, schema)(implicit spark)
  2. Creates a Spark Schema from a JSON document(s).

      JsonUtils.getSchemaFromJson(json)

ColumnImplicits

ColumnImplicits provide implicit methods for transforming Spark Columns

  1. Transforms the column into a boolean column, checking if values are negative or positive infinity

      column.isInfinite()
  2. Returns column with requested substring. It shifts the substring indexation to be in accordance with Scala/ Java. The provided starting position where to start the substring from, if negative it will be counted from end

      column.zeroBasedSubstr(startPos)
  3. Returns column with requested substring. It shifts the substring indexation to be in accordance with Scala/ Java. If the provided starting position where to start the substring from is negative, it will be counted from end. The length of the desired substring, if longer then the rest of the string, all the remaining characters are taken.

      column.zeroBasedSubstr(startPos, length)

StructFieldImplicits

StructFieldImplicits provides implicit methods for working with StructField objects.

Of them, metadata methods are:

  1. Gets the metadata Option[String] value given a key

      structField.metadata.getOptString(key)
  2. Gets the metadata Char value given a key if the value is a single character String, it returns the char, otherwise None

      structField.metadata.getOptChar(key)
  3. Gets the metadata boolean value of a given key, given that it can be transformed into boolean

      structField.metadata.getStringAsBoolean(key)
  4. Checks the structfield if it has the provided key, returns a boolean

      structField.metadata.hasKey(key)

ArrayTypeImplicits

ArrayTypeImplicits provides implicit methods for working with ArrayType objects.

  1. Checks if the arraytype is equivalent to another

      arrayType.isEquivalentArrayType(otherArrayType)
  2. For an array of arrays, get the final element type at the bottom of the array

      arrayType.getDeepestArrayType()

DataTypeImplicits

DataTypeImplicits provides implicit methods for working with DataType objects.

  1. Checks if the datatype is equivalent to another

      dataType.isEquivalentDataType(otherDt)
  2. Checks if a casting between types always succeeds

      dataType.doesCastAlwaysSucceed(otherDt)
  3. Checks if type is primitive

      dataType.isPrimitive()

StructTypeImplicits

StructTypeImplicits provides implicit methods for working with StructType objects.

  1. Get a field from a text path

      structType.getField(path)
  2. Get a type of a field from a text path

      structType.getFieldType(path)
  3. Checks if the specified path is an array of structs

      structType.isColumnArrayOfStruct(path)
  4. Get nullability of a field from a text path

      structType.getFieldNullability(path)
  5. Checks if a field specified by a path exists

      structType.fieldExists(path)
  6. Get paths for all array fields in the schema

      structType.getAllArrayPaths()
  7. Get a closest unique column name

      structType.getClosestUniqueName(desiredName)
  8. Checks if a field is the only field in a struct

      structType.isOnlyField(columnName)
  9. Checks if 2 structtypes are equivalent

      structType.isEquivalent(other)
  10. Returns a list of differences in one utils to the other

      structType.diffSchema(otherSchema, parent)
  11. Checks if a field is of the specified type

      structType.isOfType[ArrayType](path)
  12. Checks if a field is a subset of the specified type

          structType.isSubset(other)
  13. Returns data selector that can be used to align utils of a data frame.

          structType.getDataFrameSelector()

StructTypeArrayImplicits

  1. Get first array column's path out of complete path

      structType.getFirstArrayPath(path)
  2. Get all array columns' paths out of complete path.

      structType.getAllArraysInPath(path)
  3. For a given list of field paths determines the deepest common array path

      structType.getDeepestCommonArrayPath(fieldPaths)
  4. For a field path determines the deepest array path

      structType.getDeepestArrayPath(path)
  5. Checks if a field is an array that is not nested in another array

      structType.isNonNestedArray(path)

DataFrameImplicits

  1. Changes the fields structure of the DataFrame to adhere to the provided schema or selector. Data types remain intact
  dataFrame.alignSchema
  1. Persist this Dataset with the default storage level, avoiding the warning in case the cache has happened already before
  dataFrame.cacheIfNotCachedYet()
  1. Get the string representation of the data in the format as Dataset.show()]]` displays them
  dataFrame.dataAsString()
  1. Adds a column to a dataframe if it does not exist
  dataFrame.withColumnIfDoesNotExist(path)
  1. Casts all NullType fields of the DataFrame to their corresponding types in targetSchema.
  dataFrame.enforceTypeOnNullTypeFields(targetSchema)

Spark Version Guard

A class which checks if the Spark job version is compatible with the Spark Versions supported by the library

Default mode checking

SparkVersionGuard.fromDefaultSparkCompatibilitySettings.ensureSparkVersionCompatibility(SPARK_VERSION)

Checking for 2.X versions

SparkVersionGuard.fromSpark2XCompatibilitySettings.ensureSparkVersionCompatibility(SPARK_VERSION)

Checking for 3.X versions

SparkVersionGuard.fromSpark3XCompatibilitySettings.ensureSparkVersionCompatibility(SPARK_VERSION)

OncePerSparkSession

Abstract class to help attach/register UDFs and similar object only once to a spark session.

Usage: Extend this abstract class and implement the method register. On initialization the register method gets executed only if the class + spark session combination is unique.

This way we ensure only single registration per spark session.

DataFrameImplicits

DataFrameImplicits provides methods for transformations on Dataframes

  1. Getting the string of the data of the dataframe in similar fashion as the show function present them.

          df.dataAsString() 
      
          df.dataAsString(truncate)
      
          df.dataAsString(numRows, truncate)
    
          df.dataAsString(numRows, truncateNumber)
      
          df.dataAsString(numRows, truncate, vertical)
  2. Adds a column to a dataframe if it does not exist. If it exists, it will apply the provided function

       df.withColumnIfDoesNotExist((df: DataFrame, _) => df)(colName, colExpression)
  3. Aligns the utils of a DataFrame to the selector for operations where utils order might be important (e.g. hashing the whole rows and using except)

       df.alignSchema(structType)
       df.alignSchema(listColumns)

Functions

  1. Similarly to col function evaluates the column based on the provided column name. But here, it can be a full path even of nested fields. It also evaluates arrays and maps where the array index or map key is in brackets [].

        def col_of_path(fullColName: String): Column
  2. Provides a column of NULL values.

        def nul_coll(): Column
  3. Provides a column of NULL values, but the actual type is per specification

        def nul_coll(dataType: DataType): Column

Error Handler

A trait and a set of supporting classes and other traits to enable errors channeling between libraries and application during Spark data processing.

  1. It has an implicit dataFrame for easier usage of the methods provided by the error handler trait.

  2. It provides four basic implementations

    • ErrorHandlerErrorMessageIntoArray - An implementation of error handler trait that collects errors into columns of struct based on [za.co.absa.spark.commons.errorhandler.ErrorMessage ErrorMessage] case class.
    • ErrorHandlerFilteringErrorRows - An implementation of error handler that implements the functionality of filtering rows that have some error (any of the error columns is not NULL).
    • ErrorHandlerIgnoringErrors - An implementation of error handler trait that ignores the errors detected during the dataFrame error aggregation
    • ErrorHandlerThrowingException - An implementation of error handler trait that throws an exception on error detected.

Spark Commons Test

Usage:

class MyTest extends SparkTestBase {
}

By default, it will instantiate a local Spark. There is also the possibility to use it in yarn mode:

class MyTest extends SparkTestBase {
override lazy val spark: SparkSession = initSpark(new YarnSparkConfiguration(confDir, distJarsDir))
}

How to generate Code coverage report

sbt jacoco

Code coverage will be generated on path:

{project-root}/spark-commons/target/spark{spark_version}-jvm-{scala_version}/jacoco/report/html
{project-root}/spark-commons-test/target/jvm-{scala_version}/jacoco/report/html

How to Release

Please see this file for more details.