Here are some tips to improve your ETL performance:

1.Try to drop unwanted data as early as possible in your ETL pipeline
We used to store raw data in s3 and pull the data for processing which bottlenecks the performance.
What we improved – We removed unnecessary data from the raw data by creating data quality scripts which reduces data size considerably. Whenever we get new data, We run incremental scripts to remove unwanted data and keep only meaningful data in s3.
It’s desirable to reduce the data volume as we progress along the ETL pipeline. Consider purging the unwanted data/datasets as early as possible in the ETL pipeline to improve the performance. Not all applications/pipelines would see significant reduction but in most cases, effective purging through data quality and data prep tasks/scripts can reduce the data volume substantially which results in significant performance improvement.

2. Avoid select *, instead try to fetch only needed fields.
We used to select all the fields from a table in processing.
What we improved – selected only needed fields instead of a complete table. Always try to select only needed fields instead of using select * from any table. It improves query timing significantly. We have seen 30-40% execution time improvement in time in over-all pipeline.

3. Create join/inner join when using 2 or more tables in a query.
Our case:- 
Below are two sample tables:
Table A which actually contains 30 million+ records.

IDNamedatenumber
1john2020-01-01
2max2019-12-13

Table B

from_dateto_datenumber
2019-12-012019-12-151
2019-12-162019-12-312
2020-01-012020-01-153
2020-01-162020-01-314
2020-02-012020-02-155
2020-02-162020-02-286

We have one update query where the date field from table A is to check between the range of from_date field and to_date field of table B. If it is between that range, then update the field A.number from B.number. The query looks like this:-

update A
set A.number = B.number
where A.date >= B.from_date
and A.date <= B.to_date

Above query took more than 20 minutes to process around 30M+ records since it does not have any join/inner join predicate. What we done to improve/reduce this query time:-
Since tables don’t have any field to use join predicate, we created a custom field only for joining purposes by looking at table B data pattern. Updated table B is as follows:-

from_dateto_datenumberyear_mon
2019-12-012019-12-1512019-12
2019-12-162019-12-3122019-12
2020-01-012020-01-1532020-01
2020-01-162020-01-3142020-01
2020-02-012020-02-1552020-02
2020-02-162020-02-2862020-02

Now the update query looks like this:-

update A
set A.number = B.number
where year_month(A.date) = B.year_mon
and A.date >= B.from_date
and A.date <= B.to_date

Now the above query took just 1 min 30 secs to process around 30M+ records.
Why did we see this performance gain? Since we matched year & month from A.date field with B.year_mon field, a direct join predicate is used, which means the query skipped all other rows whenever it gets a matching join predicate which saves a lot of query time.

4. Use limit clause if saving large query result into list:-
Scenario 1:- 
List = “Select * from table”
If the table is large, then it may take a long time or even it may fail(time out error).
Scenario 2:- 
List = “Select * from table limit 100000 ”
Applying the limit clause gives results in less time without failing.

5. It is best to assign a default value while creating column:-
Today, most of the downstream application/processing happens in the realm of data science. More often than not, null values are not appreciated by the data science teams. Hence, it’s always a best practice to discuss and decide the default field values with your entire team – especially teams working on the downstream applications/models. 

6. Saving large Spark Dataframe as CSV file in s3:-
Many data engineers tend to save the Dataframe directly into CSV on s3 to output the intermediate/final results of the data processing jobs. If the Dataframe is large – say ~1 million rows and 250+ columns, it could easily take a number of hours to output the Dataframe as a CSV on S3.
It is recommended to save the Dataframe to HDFS and then copy the files to S3 using DistCp – a tool for large inter/intra cluster file copy. With this technique, we have often witnessed 40% to 60% improvement in the time taken to store/output the results on S3.

7. Saving large Spark Dataframe into a single csv file:
We often use the ‘.coalesce(1)` function on Dataframe to write data in a single file.
But this leads to executing all the transformations to compute that Dataframe with parallelism 1. So to avoid that we first saved Dataframe without `coalesce(1)` on intermediate location(generally on HDFS), then later read that intermediate output into Dataframe and applied `coalesce(1)` on it to obtain final output in a single file.
With this optimization, we observed a 50% time improvement in saving the output as a single file.

8. Spark best practices

  • Dynamic Allocation to Scale In/Out Spark Job automatically

When you’re not sure about the number of executors required to run your Spark Job and the load is not even, use below configurations of Spark to enable dynamic resource allocation:
spark.dynamicAllocation.enabled = true
And you can even restrict to use min and max number of executors using below configurations:
spark.dynamicAllocation.minExecutors
spark.dynamicAllocation.maxExecutors
This will help when you have multiple Spark Jobs running on your cluster and you don’t want a single job to hog all resources.

  • Resource Allocation & Monitoring job

If your job is taking more than expected time, monitor it using Spark UI; 
identify the stages where it’s taking longer and monitor the resource utilisation.
Allocate more/less resources using configurations:
spark.executor.memory
spark.executor.cores
spark.driver.memory
Don’t use `collect()` on large dataframes, as it transfers all the data from executors to driver/master node leading to OutOfMemory error.

  • Adding new columns in dataframe efficiently

Avoid column creation using `.withColumn()` on the dataframe when you need to add a large number of new columns. It creates a new dataframe for each new column, inflating the Spark execution DAG and slowing down the job execution. As an alternative, create a list of existing and new columns as below:
Let’s assume we have a dataframe with following data-

full_namecity
Ross GellerNew York
Monica GellerNew York

# existing columns of the dataframe
columns = [col(‘full_name’), col(‘city’)]
# add new columns to the list
columns.append(
col(‘full_name’).split(”).getItem(0).alias(‘first_name’))
columns.append(
col(‘full_name’).split(‘ ‘).getItem(1).alias(‘last_name’))
Then pass that list as a var-arg to `select()` method on dataframe:
df.select(columns*)
This will create only one dataframe with all the columns we mentioned.

full_namecityfirst_namelast_name
Ross GellerNew YorkRossGeller
Monica GellerNew YorkMonicaGeller