Kuidas kirjutada Spark ETL protsesse

Spark on võimas tööriist andmete eraldamiseks, teisenduste käitamiseks ja tulemuste laadimiseks andmehoidlasse.

Spark töötab arvutused paralleelselt, nii et täitmine on välkkiire ja klastrite suuruse suurendamiseks on vaja skaleerida. Sparki algne API ja spark-daria objekt EtlDefinition võimaldavad elegantselt määratleda ETL-loogikat.

Väljavõte

Oletame, et teil on Parketi failide andmejärv. Siin on mõned näitekoodid, mis tõmbavad andmejärve, filtreerivad andmed ja jagavad seejärel andmete alamhulga ümber.

val dataLakeDF = spark.read.parquet ("s3a: // some-bucket / foo")
val extractDF = dataLakeDF
  . kus (col ("meeleolu") === "õnnelik")
  .jaotus (10000)

DataFramesi ümberjaotamise kohta lisateabe saamiseks lugege seda ajaveebi postitust. Oleme nüüd valmis ekstraktiDF-i teisendama.

Muutke

Saame määratleda kohandatud teisendusfunktsiooni, mis võtab DataFrame'i argumendina ja tagastab DataFrame'i, et ekstraktidaDF. Kohandatud teisendusfunktsioonid on korduvkasutatavad ja hõlpsasti kontrollitavad, nii et see loob kvaliteetse koodialuse.

Määratleme paar DataFrame'i teisendust.

def withGreeting () (df: DataFrame): DataFrame = {
  df.Column ("tervitus", põlenud ("tere maailm"))
}

def withFarewell () (df: DataFrame): DataFrame = {
  df.Column ("hüvastijätt", põlenud ("hüvasti"))
}

Loome mudeli () funktsiooni, mis aheldab kohandatud teisendusi.

def mudel () (df: DataFrame): DataFrame = {
  df
    .transform (withGreeting ())
    .transform (withFarewell ())
}

Meie ekstrakti teisenduste käivitamiseks saame käivitada ekstraktiDFDF.transform (mudel ()). Lihtne hernes

Lisateavet kohandatud DataFrame'i teisenduste aheldamise kohta leiate sellest ajaveebi postitusest.

Laadige (või teatage)

Spark DataFrame'i kirjutajate abil saame määratleda üldise funktsiooni, mis kirjutab DataFrame'i S3 asukohta.

def exampleWriter () (df: DataFrame): Ühik = {
  val tee = "s3a: // mingi ämber / ekstraktid / baar"
  df.write.mode (SaveMode.Overwrite) .parkett (tee)
}

Kirjutamisfunktsioon peaks võtma argumendina DataFrame'i ja mitte midagi tagastama (ühik).

EtlDefinition

Lähtestame spl-darias määratletud EtlDefinitioni juhtumiklassi ja kasutame protsessi () meetodit ETL-koodi täitmiseks.

val etl = uus EtlDefinition (
  sourceDF = ekstraktDF,
  teisendada = mudel (),
  kirjuta = näideWriter ()
)

ETL-koodi täitmiseks toimige järgmiselt.

etl.process ()

Vau, see oli lihtne EtlDefinitioni objekti saab isegi Slack-teadete tegemiseks ümber lükata! Kaanen seda teises ajaveebipostituses.

Vaadake EtlDefinitioni argumentide meetodiallkirju ja veenduge, et mõistate, kuidas meie määratletud funktsioonid sellesse vormi sobivad.

juhtumiklass EtlDefinition (
    sourceDF: DataFrame,
    teisendamine: (DataFrame => DataFrame),
    kirjutada: (DataFrame => Ühik),
    metaandmed: scala.collection.mvable.Map [String, Any] = scala.collection.mvable.Map [String, Any] ()
) {

  def protsess (): ühik = {
    kirjuta (sourceDF.transform (teisenda))
  }

}

Võtke arvesse, et EtlDefinition objekte saab soovi korral suvalise metaandmekaardiga kiirendada.

Mitu EtlDefinitsiooni

EtlDefinitioni objektide kogumi saate korraldada muudetavas kaardis, nii et neid on lihtne tuua ja teostada.

val etls = scala.collection.mvable.Map [String, EtlDefinition] ()
etls + = ("riba" -> etl)
etls + = ("baz" -> etl2)
etls ("bar"). protsess ()

Järgmised sammud

Siin on peamised sammud Sparki hea ETL-koodi kirjutamiseks.

  • Pärast filtreerimist jagage DataFrame kindlasti uuesti
  • Kohandatud DataFrame'i teisendused tuleks osadeks jagada, neid eraldi testida ja seejärel aheldada mudel () meetodil
  • Looge EtlDefinition objektid, et korraldada oma ETL loogika ja veenduda, et kõik teie meetodi allkirjad on õiged

Kasutan käske Databricks API, AWS Lambda ja Slack Slash, et täita ETL-töid otse Slackist. Soovitan seda töövoogu väga!