maif / jooq-async   1.2.2

Apache License 2.0 GitHub

Use jooq to build queries and run it a reactive way

Scala versions: 2.13 2.12

Reactive jooq API ga-badge jar-badge

This API is a solution to use jooq with reactive clients for RDBMS.

Implementations

At the moment there are 2 implementations:

  • a blocking jdbc implementation
  • a vertx reactive implementation for postgresql only

Import

Jcenter hosts this library.

Maven

<dependency>
  <groupId>fr.maif</groupId>
  <artifactId>jooq-async-jdbc</artifactId>
    <version>${version}</version>
</dependency>

OR

<dependency>
  <groupId>fr.maif</groupId>
  <artifactId>jooq-async-reactive</artifactId>
  <version>${version}</version>
</dependency>

Gradle

implementation "fr.maif:jooq-async-api:${version}"

OR

implementation "fr.maif:jooq-async-reactive:${version}"

The API

Create a pool

The JDBC one :

PGSimpleDataSource dataSource = new PGSimpleDataSource();
dataSource.setUrl(url);
dataSource.setUser(user);
dataSource.setPassword(password);
PgAsyncPool jdbcPgAsyncPool = new JdbcPgAsyncPool(SQLDialect.POSTGRES, dataSource, Executors.newFixedThreadPool(5));

The reactive one :

DefaultConfiguration jooqConfig = new DefaultConfiguration();
jooqConfig.setSQLDialect(SQLDialect.POSTGRES);
PgConnectOptions options = new PgConnectOptions()
        .setPort(port)
        .setHost(host)
        .setDatabase(database)
        .setUser(user)
        .setPassword(password);
PoolOptions poolOptions = new PoolOptions().setMaxSize(10);
Vertx vertx = Vertx.vertx();
PgPool client = PgPool.pool(vertx, options, poolOptions);

PgAsyncPool reactivePgAsyncPool = new ReactivePgAsyncPool(client, jooqConfig);

Perform query

The idea is to use the jooq DSL as a builder to write the query. The query is then run against the underlying library.

Query one :

CompletionStage<Option<String>> futureResult = reactivePgAsyncPool
        .queryOne(dsl -> dsl.select(name).from(table).where(name.eq("Ragnar")))
        .map(mayBeResult -> mayBeResult.map(row -> row.get(name)));

Query many :

CompletionStage<List<String>> futureResult = reactivePgAsyncPool
        .query(dsl -> dsl.select(name).from(table)))
        .map(results -> results.map(row -> row.get(name)));

Stream data

Publisher<String, NotUsed> stream = reactivePgAsyncPool
                .stream(500 /*fetch size*/, dsl -> dsl.select(name).from(table))
                .map(q -> q.get(name));

The publisher comes from the reactive streams API.

Execute statement

CompletionStage<Integer> insertResult = reactivePgAsyncPool.inTransaction(t ->
        t.execute(dsl -> dsl.insertInto(table).set(name, "test"))
);

Batch statements

With this version you can send a statement once and then send all parameters. This version is the most performant if you have one statement with multiple values.

List<String> names = List.range(0, 10).map(i -> "name-" + i);
CompletionStage<Long> batchResult = reactivePgAsyncPool.executeBatch(
        dsl -> dslContext.insertInto(table).columns(name).values((String) null),
        names.map(List::of)
);

With this version, you can batch a set of statements. You should use this version if your statements are all different.

List<String> names = List.range(0, 10).map(i -> "name-" + i);
CompletionStage<Long> batchResult = reactivePgAsyncPool.executeBatch(dsl ->
        names.map(n -> dslContext.insertInto(table).set(name, n))
);

Spring reactor :

The jooq-async-reactive module expose operations with the Mono / Flux API.

PgAsyncPool pgAsyncPool = PgAsyncPool.create(client, jooqConfig);

Mono<Option<String>> result =  pgAsyncPool.queryOneOne(dsl -> dsl
            .select(name)
            .from(table)
            .where(name.eq("Ragnar"))
        )
        .map(mayBeResult -> mayBeResult.map(row -> row.get(name)));