Dynamic Partition pruning in Spark 3.0

We have seen often people saying -

  1. SparkSQL is very slow after I make integration with BI tools.
  2. Why do Joins behave weird & slow?
  3. I hate spark because I must learn how to use memory - And I'm confused I cannot decied how many executors, executor-cores, executor-memory. Defining Shuffles & partitioning during M/R
A little bit of relief as Spark 3.0 introduces Dynamic Partition Pruning
  • Assume we want to query logs_table and add filter. SELECT *FROM logs_table WHERE log_date = '2020-05-05'
No alt text provided for this image
  • Earlier we had Basic data flow which is - Scan all data first -> then Filter
  • As an optimization many Databases introduced Filter push down, which is - Filter first -> then Scan table
  • We are using one more optimization which is Static pruning. Data is stored partition files in multi-columnar data format. Query first Filter(On partition column) -> then Scan data only those partitions
No alt text provided for this image
  • We also use Table de-normalization. SELECT *FROM logs_table JOIN dates WHERE log_date = '2020-05-05'. Here static pruning is not possible. Downsides - JOIN is expensive, replicating data, either of table may be huge/large/wide
Spark 3.x introduce Dynamic partition pruning - What is this?
No alt text provided for this image

Basic idea is to filter using dynamic pruning. In static pruning partitions on data were already created. Here the data is not partitioned but will be partitioned by Spark during runtime & fetch only those partition which satisfy filter condition.

Example - Take the filter off dimension table's result, and dynamically use them in fact table to limit the data. This logic can be implemented as a sub query on fact table to limit data.

Quick overview how Spark SQL works - 

No alt text provided for this image
  1. User submits query to spark, we have Dataframe, Datasets
  2. Spark takes this & translates to digestible plan (Scala POJOs, Classes & Objects) which popularly called as Logical Plan
  3. Spark also optimized the Logical plan to Physical plan by applying rule based transformations
  4. Physical plan will be used to distribute data across clusters through RDD batches.

Optimization in Logical plan as preparation for Dynamic pruning - 

No alt text provided for this image
  1. Assume we have fact and dimension.
  2. Fact table is partitioned & Dimension is not.
  3. User submits query and spark launches scan operators.
No alt text provided for this image
  1. Here spark uses the data of dimension table filters relevant to fact table & dynamically inject before scan operator on fact table so that only data which is relevant in dimension & fact tables are selected in result

Optimization in Physical plan using Dynamic pruning (Actual action) -

No alt text provided for this image
  1. If the dimension table is small then spark executes JOINs as Broadcast Hash Join (BHJ)
  2. First spark builds hashtable off dimesion table - Technically a build relation
  3. Plugin the result of build relation into broadcast varible to be avaialble to all executors to avoid shuffle during JOIN
  4. Now spark with reference of Hashtable fetches rows coming from fact table on each worker node
  5. 2 stages of DAG here are - compute & broadcast / distribute the hashtable, probe the hashtable execute actual join
No alt text provided for this image


Above 4 steps are actual Dynamic Partition Pruning (Dynamic Filter) which is injected before table scan

Comments

Popular posts from this blog

Performance optimization in Copy data activity in Azure

Azure blob Storage interview questions

Why do we have two keys in storage account and need for rotate them ?