Build status Sonatype Nexus (Releases) Sonatype Nexus (Snapshots) Scala Steward badge

ZIO Apache Parquet

ZIO based wrapper for Apache Parquet Java implementation that leverages ZIO Schema to derive codecs

Overview

Installation

libraryDependencies += "me.mnedokushev" %% "zio-apache-parquet-core" % "@VERSION@"

Codecs

To be able to write/read data to/from parquet files you need to define the following schema and value codecs SchemaEncoder, ValueEncoder, and ValueDecoder for your case classes.

Schema

You can get Java SDK's Type by using SchemaEncoder generated by SchemaEncoderDeriver.default ZIO Schema deriver:

//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4

import zio.schema._
import me.mnedokushev.zio.apache.parquet.core.codec._

case class MyRecord(a: Int, b: String, c: Option[Long])

object MyRecord {
  implicit val schema: Schema[MyRecord]               =
    DeriveSchema.gen[MyRecord]
  implicit val schemaEncoder: SchemaEncoder[MyRecord] =
    Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.default)
}

import MyRecord._

val parquetSchema = schemaEncoder.encode(schema, "my_record", optional = false)

println(parquetSchema)
// Outputs:
// required group my_record {
//   required int32 a (INTEGER(32,true));
//   required binary b (STRING);
//   optional int64 c (INTEGER(64,true));
// }

Alternatively, you can override the schemas of some fields in your record by defining a custom SchemaEncoder and using SchemaEncoderDeriver.summoned deriver.

//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4

import me.mnedokushev.zio.apache.parquet.core.Schemas
import zio.schema._
import me.mnedokushev.zio.apache.parquet.core.codec._

case class MyRecord(a: Int, b: String, c: Option[Long])

object MyRecord {
  implicit val schema: Schema[MyRecord]               =
    DeriveSchema.gen[MyRecord]
  implicit val intEncoder: SchemaEncoder[Int]         = new SchemaEncoder[Int] {
    override def encode(schema: Schema[Int], name: String, optional: Boolean) =
      Schemas.uuid.optionality(optional).named(name)
  }
  implicit val schemaEncoder: SchemaEncoder[MyRecord] =
    Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.summoned)
}

import MyRecord._

val parquetSchema = schemaEncoder.encode(schema, "my_record", optional = false)

println(parquetSchema)
// Outputs:
// required group my_record {
//   required fixed_len_byte_array(16) a (UUID);
//   required binary b (STRING);
//   optional int64 c (INTEGER(64,true));
// }

Value

There is a sealed hierarchy of Value types for interop between Scala values and Parquet readers/writers. For converting Scala values into Value and back we need to define instances of ValueEncoder and ValueDecoder type classes. This could be done by using ValueDecoderDeriver.default ZIO Schema deriver.

//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4

import zio.schema._
import me.mnedokushev.zio.apache.parquet.core.codec._

case class MyRecord(a: Int, b: String, c: Option[Long])

object MyRecord {
  implicit val schema: Schema[MyRecord]        =
    DeriveSchema.gen[MyRecord]
  implicit val encoder: ValueEncoder[MyRecord] =
    Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.default)
  implicit val decoder: ValueDecoder[MyRecord] =
    Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.default)
}

import MyRecord._

val value  = encoder.encode(MyRecord(3, "zio", None))
val record = decoder.decode(value)

println(value)
// Outputs:
// RecordValue(Map(a -> Int32Value(3), b -> BinaryValue(Binary{"zio"}), c -> NullValue))
println(record)
// Outputs:
// MyRecord(3,zio,None)

Same as for SchemaEncoder, you can override the schemas of some fields in your record by defining custom ValueEncoder/ValueDecoder and using ValueEncoderDeriver.summoned/ValueDecoderDeriver.summoned derivers accordingly.

//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4

import me.mnedokushev.zio.apache.parquet.core.Value
import zio.schema._
import me.mnedokushev.zio.apache.parquet.core.codec._

import java.nio.charset.StandardCharsets

case class MyRecord(a: Int, b: String, c: Option[Long])

object MyRecord {
  implicit val schema: Schema[MyRecord]        =
    DeriveSchema.gen[MyRecord]
  implicit val intEncoder: ValueEncoder[Int]   = new ValueEncoder[Int] {
    override def encode(value: Int): Value =
      Value.string(value.toString)
  }
  implicit val intDecoder: ValueDecoder[Int]   = new ValueDecoder[Int] {
    override def decode(value: Value): Int =
      value match {
        case Value.PrimitiveValue.BinaryValue(v) =>
          new String(v.getBytes, StandardCharsets.UTF_8).toInt
        case other =>
          throw DecoderError(s"Wrong value: $other")
      }
  }
  implicit val encoder: ValueEncoder[MyRecord] =
    Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.summoned)
  implicit val decoder: ValueDecoder[MyRecord] =
    Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.summoned)
}

import MyRecord._

val value  = encoder.encode(MyRecord(3, "zio", None))
val record = decoder.decode(value)

println(value)
// Outputs:
// RecordValue(Map(a -> BinaryValue(Binary{"3"}), b -> BinaryValue(Binary{"zio"}), c -> NullValue))
println(record)
// Outputs:
// MyRecord(3,zio,None)

Reading/Writing files

Finally, to perform some IO operations we need to initialize ParquetWriter and ParquetReader and use either writeChunk/readChunk or writeStream/readStream methods.

//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4

import zio.schema._
import me.mnedokushev.zio.apache.parquet.core.codec._
import me.mnedokushev.zio.apache.parquet.core.hadoop.{ ParquetReader, ParquetWriter, Path }
import zio._

import java.nio.file.Files

case class MyRecord(a: Int, b: String, c: Option[Long])
object MyRecord {
  implicit val schema: Schema[MyRecord]               =
    DeriveSchema.gen[MyRecord]
  implicit val schemaEncoder: SchemaEncoder[MyRecord] =
    Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.default)
  implicit val encoder: ValueEncoder[MyRecord]        =
    Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.default)
  implicit val decoder: ValueDecoder[MyRecord]        =
    Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.default)
}

val data =
  Chunk(
    MyRecord(1, "first", Some(11)),
    MyRecord(3, "third", None)
  )

val recordsFile = Path(Files.createTempDirectory("records")) / "records.parquet"

Unsafe.unsafe { implicit unsafe =>
  Runtime.default.unsafe
    .run(
      (for {
        writer   <- ZIO.service[ParquetWriter[MyRecord]]
        reader   <- ZIO.service[ParquetReader[MyRecord]]
        _        <- writer.writeChunk(recordsFile, data)
        fromFile <- reader.readChunk(recordsFile)
        _        <- Console.printLine(fromFile)
      } yield ()).provide(
        ParquetWriter.configured[MyRecord](),
        ParquetReader.configured[MyRecord]()
      )
    )
    .getOrThrowFiberFailure()
}
// Outputs:
// Chunk(MyRecord(1,first,Some(11)),MyRecord(3,third,None))

In the previous code snippet we used ParquetReader.configured[A]() to initialize a reader that uses a parquet schema taken from a given file. Such a reader will always try to read all columns from a given file.

In case you need to read only part of the columns, use ParquetReader.projected[A]() that always will use the schema of the provided type.