Systematic ETL/ELT Programming using Spark — Stage I (Wrappers)

Niroj Pattnaik
8 min readFeb 21, 2021

Thanks to the opensource community, Spark has been continuously improving itself & helping data/ETL engineers and scientists write powerful frameworks to process big data & loading that into datastores with simplicity.

We’re going to split this potential large topic into smaller parts to help us design it in stepwise fashions. I will update all the future topic links on this page later.

On this very first topic, we are going to discuss the critical step to write strong yet simple methods to create wrapper shell scripts to design both batch and real-time processes beyond the only ETL.

The Wrapper …

https://unsplash.com/photos/Ky4q3IbRCLk?utm_source=unsplash&utm_medium=referral&utm_content=creditShareLink
Photo by Arturo Esparza on Unsplash

The wrapper script is the shell script that should have simple, high level & mostly fewer compact lines of commands to invoke the content-rich deep level algorithms bundled up in an executable. It also should be generic enough to call many such executables without much fuss! Let’s classify them into few bullet points and discuss them one by one as below:

  1. Effective Commenting
Photo by Sandie Clarke on Unsplash

It’s always a good practice to start the wrapper script with an intro about the process, input parameters, warranty, copywrites info, authors, release dates, and expected outcomes.

Worth noting that, the above text graffiti can be created easily by many online portals. The one I used here is as below:
graffiti_link (courtesy: http://www.patorjk.com/)

2. Validate & Set Environment

Setting and validating the native environment before calling the executable is the 1st step right after the intro comments. Checking all the mandatory variables are already set, checking the physical directory pre-exists, checking the Kerberos connection, readability of incoming files, open listener ports, DB credentials, s3 keys, etc. are few examples of them.

X=${X:-y} => Means substitution. When X is defined it takes its env value otherwise it defaults to y.
This is pretty handy while testing where we want to toggle between spark cluster Vs local mode externally without changing the application program or configuration files.

These env variables are only available to the local driver program & but not to the nodes in the cluster. We just need to pass them separately as spark-submit command lines or passing them in a property file or building property file in the application jar itself. Generally, it is always good practice to pass the credential stuffs encrypted and passed as command-line variables to spark-submit, i.e. — — conf “spark.yarn.appMasterEnv.ORA_PASSWD=$ORA_PASSWD”

If you pass the encrypted password, you can decrypt it at run time inside the application program. For example, as below we can try using openSSL program:
openssl enc -base64 -d -aes-128-cbc -in <encryptedPasswdFile> -pass <file:encyptionKeyFile>

3. Appropriate Master Logging at the driver

Photo by Mildly Useful on Unsplash

Application logger inside the user’s spark application code(log4j) will write the logs per local nodes inside the cluster.

Setting “spark.driver.extraJavaOption=-Dlog4j.configuration=$LOG_FILE” or with — files will help the spark to write to the logs on local worker nodes where LOG_FILE need to be passed as “spark.yarn.appMasterEnv.LOG_FILE=$LOG_FILE” through the spark-submit command.
However, your log path need to be mounted across the nodes or you need to pull the remote logs using the yarn application log command (if you use yarn) to gather all information to the driver node as given below:
yarn logs -applicationId $applicationId”; where applicationId can retrieved from the main log, sparkContext.applicationId or by using *sparkHandle.getAppId (will be discussed later).

Once we pull the logs from the cluster, we may need to direct them to our master log. Here the function logger has been written in such a way that it aligns with Java’s log4j application side logger patterns.

echo “$(date ‘+%Y-%m-%d %H:%M:%S’) $severity $SCRIPT_NAME:$lineno — $mssg”. Here Unix date command is used to follow the pattern inside the log file. $LINENO is the system sh variable used to track the line number in the shell script where the error is reported.

4. Application tracking using yarn web history URL

Instead of pulling the log files from each node, we can also easily track the process using the history URL as below:

When we run the spark-submit command or spark handler program, we can try passing the direct port number or more appropriately allowing the spark to dynamically bind to an available port# by overriding & increasing the port binding attempts (default#10) as below:
spark.port.maxRetries=100

Getting the yarn history web URL is also simple. This can be easily obtained from the yarn Hadoop configuration as below:

val webUrl = spark.sparkContext.hadoopConfiguration.get(“yarn.resourcemanager.webapp.address”, “”)

val appHstryUrl = s“http://$webUrl/cluster/app/${spark.sparkContext.applicationId}”

5. Handling Abrupt exits

Photo by Simon Berger on Unsplash

“STOP EATING ANIMALS”!! Pardon me, I’m neither quoting the words from Bill Gates nor having any intention to hurt any meat-lover but stressing out to stop abusing the Big Data zoo animals playing in our application’s backyard :)

There could be a lot of scenarios where glitches at the application end raising uncaught exceptions & terminating the job abruptly. Similarly, many times the external scheduler or native OS can malfunction resulting in sudden termination of the running job leaving unfinished yarn applications with residual temp directories and files (it may be also external to Spark like a DB).
For example, says somehow our job got killed by an external “kill or contrl+c” signal. At that point along with internal Jvm’s “finally statement” the wrapper script should be prepared to handle and exit as cleanly as possible.

For the above, Unix’s evergreen trap signal comes in handy as below:

“trap cleanExits INT TERM EXIT” #our wrapper script should have this line

function cleanExits {
#check application status:
yarn application -status $ApplicationId
if [ $status = “RUNNING” ];then
yarn application -kill $ApplicationId
# cleanUp(rm) if any temporary residuals
# DB commit checks and rollback reconcilation etc.
#send alert emails & logs
exit(10)
fi
}

To get the exit code of a pipe directed command like in our case,
i.e. spark-submit … … | tee -a “$LOG_FILE”, below is one correct variable:
EXIT_CD=${PIPESTATUS[0]} #Here $? does not work due to pipe commands

6. Sending Alert Emails

Photo by Solen Feyissa on Unsplash

Even though the internal application already has emailing systems, it is still better to have an external wrapper-level email to alert production supports like below:

Here we are still reading the java/scala properties file from the application to have a single source of settings.

7. Supporting continuous executable

Photo by Leon Seibert on Unsplash

Say, we are running an ETL process written in spring boot or spark streaming, and neither you have Openshift, Kubernetes, or any specialized container systems nor we have time to implement another layer and want to keep it simple, then we may want to write our wrapper in such a way that it can use Unix Cron job or legacy batch scheduler. Below is a sample 24x7 monitoring method to keep on checking the health, alerting failure, and auto kicking off the job in case of failures.

Below I’m keeping spring-boot service as an example, but the same can be applied to any continuous process. Here the corn job is scheduled for every 5 mins. So __MAX_TRIAL is set to 60 with a sleep interval of 5 secs.
( nohup …) & here goes to the sub-process with an external daemon process wait for its exit status.

8. Elegant & generic ways to call spark executables

Photo by Tobias Tullius on Unsplash

Of course, the best way to call spark jobs through the spark-submit but we can also call it through system command inside Scala or directly programming & calling the main class through the SparkHandler class(*to be discussed in our future chapters).

For various reasons, a developer may want to do it from Scala directly instead of doing the spark-submit from the wrapper script like listing, sFTPing, manipulating local inbound files before calling spark.

Since after compilation Scala code turned into Java bytecodes, the host system can easily call the scala based application without Scala installation by using direct JVM commands.

java -DENV=$ENV -Dscala.usejavacp=true -Xmx2048m \
-DLOG_PATH=$LOG_PATH \
-cp $RUN_DIR/bin/spark_etl.jar:$EXTERNAL_JARS:/usr/hdp/…/spark-library-2.11.12.jar:/usr/hdp/…/spark2-client/jars/* \
com.niroj.office.LocalFileProcesser “$ARG1” “$ARG2” …

9. Use distributed files system as early as possible & as full as possible with a single spark call

Photo by Jan Canty on Unsplash

When the batch files come from external systems mostly through sFTP, those land on a local file system. So to be able to process them in spark cluster mode, the user needs to copy them from the local file system to a distributed file system like an HDFS or S3 in RAW format.

Often people copy the file one by one and call spark-submit for each file separately, leading to unnecessary cost regarding spark initialization every time it starts and GC on each stop. This also reduces the scope of parallelism by stopping reading similar HDFS files together through a file pattern. Again in the case of a lot of small files, it is better to merge all of them together.

Hence, it is better to copy back all the incoming local files together and then calling a single spark-submit for all of them together for similar file layouts.

10. Hiding secrets at debugging mode

Photo by Kristina Flour on Unsplash

And at the last, it is always best practice not to show any secret variables on our logs by debug mode accidentally like below:

And that’s all the 10 points I thought to highlight. Please give your valuable feedback on the comment sections and/or feel free to reach out to my below Linkedin address as usual:

Linkedin

We will discuss more detailed ETL programming in our upcoming subjects in the future.

Thanks for reading!!

--

--

Niroj Pattnaik

Hadoop Developer |BigData/ETL Engineer| Techincal Architect| And a Student.