modakanalytics / jdbc.almaren   0.0.1

Apache License 2.0 GitHub

Allow to execute any SQL query using JDBC.

Scala versions: 2.13 2.12 2.11

JDBC Connector

Build Status

JDBC Connector allow you to execute any SQL statement using Apache Spark.

To add JDBC connector dependency to your sbt build:

libraryDependencies += "com.github.music-of-the-ainur" %% "jdbc-almaren" % "0.0.6-3.4"

To run in spark-shell:

For scala-version(2.12):

spark-shell --master "local[*]" --packages "com.github.music-of-the-ainur:almaren-framework_2.12:0.9.10-3.4,com.github.music-of-the-ainur:jdbc-almaren_2.12:0.0.6-3.4"

For scala-version(2.13):

spark-shell --master "local[*]" --packages "com.github.music-of-the-ainur:almaren-framework_2.13:0.9.10-3.4,com.github.music-of-the-ainur:jdbc-almaren_2.13:0.0.6-3.4"

Connector Usage

Maven / Ivy Package Usage

The connector is also available from the Maven Central repository. It can be used using the --packages option or the spark.jars.packages configuration property. Use the following value

version Connector Artifact
Spark 3.4.x and scala 2.13 com.github.music-of-the-ainur:jdbc-almaren_2.13:0.0.6-3.4
Spark 3.4.x and scala 2.12 com.github.music-of-the-ainur:jdbc-almaren_2.12:0.0.6-3.4
Spark 3.3.x and scala 2.13 com.github.music-of-the-ainur:jdbc-almaren_2.13:0.0.6-3.3
Spark 3.3.x and scala 2.12 com.github.music-of-the-ainur:jdbc-almaren_2.12:0.0.6-3.3
Spark 3.2.x and scala 2.12 com.github.music-of-the-ainur:jdbc-almaren_2.12:0.0.6-3.2
Spark 3.1.x and scala 2.12 com.github.music-of-the-ainur:jdbc-almaren_2.12:0.0.6-3.1
Spark 2.4.x and scala 2.12 com.github.music-of-the-ainur:jdbc-almaren_2.12:0.0.6-2.4
Spark 2.4.x and scala 2.11 com.github.music-of-the-ainur:jdbc-almaren_2.11:0.0.6-2.4

JDBC Batch

Example

import com.github.music.of.the.ainur.almaren.Almaren
import com.github.music.of.the.ainur.almaren.builder.Core.Implicit
import com.github.music.of.the.ainur.almaren.jdbc.JDBC.JDBCImplicit

import spark.implicits._

 val almaren = Almaren("jdbc-almaren")

 val updateSourceDf = Seq(
    ("John", "Jones"),
    ("David", "Smith"),
    ("Michael", "Lee"),
    ("Chris", "Johnson"),
    ("Mike", "Brown")
  ).toDF("first_name", "last_name")

  val updateQuery = "UPDATE person_info set first_name = ? where last_name = ?"

  almaren.builder
    .sourceDataFrame(updateSourceDf)
    .sql("select monotonically_increasing_id() as __ID__,first_name,last_name from __TABLE__")
    .jdbcBatch("jdbc:postgresql://localhost:5432/almaren", "org.postgresql.Driver", updateQuery, 1000, Some("postgres"), Some("postgres"),Map("connectionTimeoutMillis" -> "3000","maxSize"->"10"))
    .batch
    .count

Parameters

Parameter Description Type
url The JDBC URL to connect to String
driver The class name of the JDBC driver to use to connect to this URL. String
query Query to be executed String
batchSize Number of records that will be send to the database in a single transaction Int
user Database user Option[String]
password Database password Option[String]
params Other extra parameters like connectionTimeout etc ..can be specified Map[String,String]

Special Columns

Input:

Parameters Mandatory Description
__ID__ Yes(Should be the first column) This field will be in response of jdbc.almaren component, it's useful to join data

Output:

Parameters Description
__ID__ Custom ID , This field will be useful to join data
__URL__ The JDBC URL used to connect to
__DRIVER__ The class name of the JDBC driver to used to connect to this URL
__QUERY__ Query executed
__BATCHSIZE__ Number of records that will be send to the database in a single transaction
__ELAPSED_TIME__ Query Execution time
__ERROR__ Error message if query execution fails

JDBC Query

Example

import com.github.music.of.the.ainur.almaren.Almaren
import com.github.music.of.the.ainur.almaren.builder.Core.Implicit
import com.github.music.of.the.ainur.almaren.jdbc.JDBC.JDBCImplicit

import spark.implicits._

 val almaren = Almaren("jdbc-almaren")

 val mergeQuery =
     """WITH upsert as(
       |  update
       |    public.person_info t2
       |  set
       |    first_name = t1.first_name,
       |    last_name = t1.last_name
       |  from
       |    person_info_temp t1
       |  where
       |    t2.country = t1.country RETURNING t2.*
       |)
       |insert into
       |  person_info
       |select
       |  p.first_name,
       |  p.last_name,
       |  p.country
       |from
       |  person_info_temp p
       |where
       |  p.country not in (
       |    select
       |      q.country
       |    from
       |      upsert q
       |  );""".stripMargin

   almaren.builder
        .jdbcQuery("jdbc:postgresql://localhost:5432/almaren", "org.postgresql.Driver", mergeQuery, Some("postgres"), Some("postgres"),Map("connectionTimeoutMillis" -> "3000","maxSize"->"10"))
        .batch

Parameters

Parameter Description Type
url The JDBC URL to connect to String
driver The class name of the JDBC driver to use to connect to this URL. String
query Query to be executed String
user Database user Option[String]
password Database password Option[String]
params Extra parameters like connectionTimeout etc ..can be specified Map[String,String]

Params

Parameter Description
connectionTimeoutMillis The time for which the connection gets timed out
maxSize Maximum number of connections available