Unlocking the Power of Bucketing in Spark: Optimize Your Data Processing
Bucketing is a process of shuffling and sorting the data and storing it in physical location.
Based on the above statement we can say that the bucketing can be used when we need the data to be shuffled and sorted. The most general case where the data is usually shuffled and sorted is when we perform the join on two data frames.
When we apply the join transformation the data needs to be shuffled on the key provided and to be there in the sorted order so it can be merged for joining with other table(Sort merge Join).
Joining two tables or data frames is as similar to the nested for loop when each row of the column(key Column) will be looped over the target table or data frame.
Sort Merge Join:
Below is sample of Sort Merge Join representation of how after reading the data from Storage it shuffles and sorts for further processing it in the Join.
Below is the Query Plan for joining process with out bucketing-
Below we can see the Shuffling and sorting in joining process.
Bucketing the two tables involved in join (Which totally avoids shuffle and sort)
Below is the Query Plan it follow:
Bucketing only a single table which avoids shuffle and sort over one side of the table
When to Use Bucketing?
Bucketing needs to done when we are sure that the same data frame(table) is going to be joined with multiple (data frames)tables.
Because we are per-performing the half of the join process which generally happens on any table for joining and storing it. So the result of ( Shuffle and Sort) can be used directly and which can significantly reduce the join cost as we are eliminating both the Shuffle and sort.
Limitations of bucketing:
$ Bucketing cannot be done on delta tables.
$ Bucketing is cost effective if there are any underlying data changes on the tables where the bucketing is applied then re-bucketing needs to be performed.
$ Bucketing can only be saved as Tables( Something that has Meta-data associated with it)
$ Spark reads each bucket as a partition, have an estimate on the size of each bucket. Because if the data bucketed on particular Key had a huge data then that can lead to SKEW.
(states_vehicles_df.bucketBy(898, ‘State_id’)
.sortBy(‘State_id)
.mode("overwrite")
.option(“path”,”/mount/data/stata_bucket/vehicles”)
.saveAsTable(“buk_vehicles”))