Sometimes data is unevenly distributed leading to data skew. What it means is a partition has more data due to same/ related keys compared to other partitions.
In case of Joins and Aggregations , all data for same key should be co-located, may be processed by one container/ executor . This may be lead to slowness of application.
Solution -
In case of Joins and Aggregations , all data for same key should be co-located, may be processed by one container/ executor . This may be lead to slowness of application.
Solution -
- If data is small than smaller data set can be broadcasted. Thus, increasing join efficiency. This is governed by property - spark.sql.autoBroadcastJoinThreshold
- Identify if there are too many NULL values then filter them out before joining. And , process records with NULL keys separately then do a union with renaming data set.
- Salting -
To understand salting, Lets understand problem with an example -
Table 1
Key
1
1
1
Table 2
Key
1
1
On joining Table1 with Table 2,
- Since this is same key all data should be shuffled to same container or one JVM, which will return 3*2 rows . like
1 * 1
1 * 1
1 * 1
1 * 1
1 * 1
1 * 1
Note - Above is problem with data skew. All tasks will be processed by one executor leading to slow performance.
Say, now we randomize the keys for Table 1 & Table 2 such that data is evenly distributed. This is what Salting is.
Table 1
Key Salt_key_T1
1 0
1 1
1 2
Table 2
Key Salt_key_T2
1 0
1 1
1 2
1 0
1 1
1 2
On joining Table1 with Table 2 on key + Salt_key - We still get 3*2 = 6 rows. But, previous keys are more randomized.
Old Key was just "1"
New keys are "1 0" "1 1" "1 2".
So, data can be processed by different executors (jvm's) leading to better data distribution and improved processing time.
K1 K2 SK_T1 SK_T2
1 1 0 0
1 1 0 0
1 1 1 1
1 1 1 1
1 1 2 2
1 1 2 2
How to do that ? For example -
- Add following SQL column with Table 1 - floor(rand(123) * 2) - This will assign random values between 0 to 2 to each row of Table 1
- Add following SQL column with Table 2 - explode(array(0,1,2)) - This will multiply each row in table2 with 3 values (0,1,2). So, we will have data duplicated as many times as number of distinct salt keys we added to Table1
- Note , above step will generate random numbers. Instead of that you can also use - monotonically_increasing_id() % 3
- In join expression use new salt key column as well apart from original join expression.
Comments
Post a Comment