ZIO based wrapper for Apache Parquet Java implementation that leverages ZIO Schema to derive codecs
libraryDependencies += "me.mnedokushev" %% "zio-apache-parquet-core" % "@VERSION@"
To be able to write/read data to/from parquet files you need to define the following schema and value codecs
SchemaEncoder
, ValueEncoder
, and ValueDecoder
for your case classes.
You can get Java SDK's Type
by using SchemaEncoder
generated by SchemaEncoderDeriver.default
ZIO Schema deriver:
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4
import zio.schema._
import me.mnedokushev.zio.apache.parquet.core.codec._
case class MyRecord(a: Int, b: String, c: Option[Long])
object MyRecord {
implicit val schema: Schema[MyRecord] =
DeriveSchema.gen[MyRecord]
implicit val schemaEncoder: SchemaEncoder[MyRecord] =
Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.default)
}
import MyRecord._
val parquetSchema = schemaEncoder.encode(schema, "my_record", optional = false)
println(parquetSchema)
// Outputs:
// required group my_record {
// required int32 a (INTEGER(32,true));
// required binary b (STRING);
// optional int64 c (INTEGER(64,true));
// }
Alternatively, you can override the schemas of some fields in your record by defining a custom SchemaEncoder
and using SchemaEncoderDeriver.summoned
deriver.
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4
import me.mnedokushev.zio.apache.parquet.core.Schemas
import zio.schema._
import me.mnedokushev.zio.apache.parquet.core.codec._
case class MyRecord(a: Int, b: String, c: Option[Long])
object MyRecord {
implicit val schema: Schema[MyRecord] =
DeriveSchema.gen[MyRecord]
implicit val intEncoder: SchemaEncoder[Int] = new SchemaEncoder[Int] {
override def encode(schema: Schema[Int], name: String, optional: Boolean) =
Schemas.uuid.optionality(optional).named(name)
}
implicit val schemaEncoder: SchemaEncoder[MyRecord] =
Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.summoned)
}
import MyRecord._
val parquetSchema = schemaEncoder.encode(schema, "my_record", optional = false)
println(parquetSchema)
// Outputs:
// required group my_record {
// required fixed_len_byte_array(16) a (UUID);
// required binary b (STRING);
// optional int64 c (INTEGER(64,true));
// }
There is a sealed hierarchy of Value
types for interop between Scala values and Parquet readers/writers.
For converting Scala values into Value
and back we need to define instances of ValueEncoder
and ValueDecoder
type classes. This could be done by using ValueDecoderDeriver.default
ZIO Schema deriver.
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4
import zio.schema._
import me.mnedokushev.zio.apache.parquet.core.codec._
case class MyRecord(a: Int, b: String, c: Option[Long])
object MyRecord {
implicit val schema: Schema[MyRecord] =
DeriveSchema.gen[MyRecord]
implicit val encoder: ValueEncoder[MyRecord] =
Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.default)
implicit val decoder: ValueDecoder[MyRecord] =
Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.default)
}
import MyRecord._
val value = encoder.encode(MyRecord(3, "zio", None))
val record = decoder.decode(value)
println(value)
// Outputs:
// RecordValue(Map(a -> Int32Value(3), b -> BinaryValue(Binary{"zio"}), c -> NullValue))
println(record)
// Outputs:
// MyRecord(3,zio,None)
Same as for SchemaEncoder
, you can override the schemas of some fields in your record by defining custom
ValueEncoder
/ValueDecoder
and using ValueEncoderDeriver.summoned
/ValueDecoderDeriver.summoned
derivers accordingly.
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4
import me.mnedokushev.zio.apache.parquet.core.Value
import zio.schema._
import me.mnedokushev.zio.apache.parquet.core.codec._
import java.nio.charset.StandardCharsets
case class MyRecord(a: Int, b: String, c: Option[Long])
object MyRecord {
implicit val schema: Schema[MyRecord] =
DeriveSchema.gen[MyRecord]
implicit val intEncoder: ValueEncoder[Int] = new ValueEncoder[Int] {
override def encode(value: Int): Value =
Value.string(value.toString)
}
implicit val intDecoder: ValueDecoder[Int] = new ValueDecoder[Int] {
override def decode(value: Value): Int =
value match {
case Value.PrimitiveValue.BinaryValue(v) =>
new String(v.getBytes, StandardCharsets.UTF_8).toInt
case other =>
throw DecoderError(s"Wrong value: $other")
}
}
implicit val encoder: ValueEncoder[MyRecord] =
Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.summoned)
implicit val decoder: ValueDecoder[MyRecord] =
Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.summoned)
}
import MyRecord._
val value = encoder.encode(MyRecord(3, "zio", None))
val record = decoder.decode(value)
println(value)
// Outputs:
// RecordValue(Map(a -> BinaryValue(Binary{"3"}), b -> BinaryValue(Binary{"zio"}), c -> NullValue))
println(record)
// Outputs:
// MyRecord(3,zio,None)
Finally, to perform some IO operations we need to initialize ParquetWriter
and ParquetReader
and use either
writeChunk
/readChunk
or writeStream
/readStream
methods.
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4
import zio.schema._
import me.mnedokushev.zio.apache.parquet.core.codec._
import me.mnedokushev.zio.apache.parquet.core.hadoop.{ ParquetReader, ParquetWriter, Path }
import zio._
import java.nio.file.Files
case class MyRecord(a: Int, b: String, c: Option[Long])
object MyRecord {
implicit val schema: Schema[MyRecord] =
DeriveSchema.gen[MyRecord]
implicit val schemaEncoder: SchemaEncoder[MyRecord] =
Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.default)
implicit val encoder: ValueEncoder[MyRecord] =
Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.default)
implicit val decoder: ValueDecoder[MyRecord] =
Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.default)
}
val data =
Chunk(
MyRecord(1, "first", Some(11)),
MyRecord(3, "third", None)
)
val recordsFile = Path(Files.createTempDirectory("records")) / "records.parquet"
Unsafe.unsafe { implicit unsafe =>
Runtime.default.unsafe
.run(
(for {
writer <- ZIO.service[ParquetWriter[MyRecord]]
reader <- ZIO.service[ParquetReader[MyRecord]]
_ <- writer.writeChunk(recordsFile, data)
fromFile <- reader.readChunk(recordsFile)
_ <- Console.printLine(fromFile)
} yield ()).provide(
ParquetWriter.configured[MyRecord](),
ParquetReader.configured[MyRecord]()
)
)
.getOrThrowFiberFailure()
}
// Outputs:
// Chunk(MyRecord(1,first,Some(11)),MyRecord(3,third,None))
In the previous code snippet we used ParquetReader.configured[A]()
to initialize a reader that uses a parquet schema taken from a given file. Such a reader will always try to read all columns from a given file.
In case you need to read only part of the columns, use ParquetReader.projected[A]()
that always will use the schema of the provided type.