Feb 6, 2019 - Dmytro Mykhalyk (Data Engineer)  

Spark DataSkew Problem

Warehouse architects, data engineers and spark developers with an intermediate level of experience who want to improve performance of data processing.

Data Distribution in Big Data

The performance of the Big Data systems is directly linked to the uniform distribution of the processing data across all of the workers. When you have a database table and then take the data from it to processing, the rows of the data should be distributed uniformly among all the data workers. If some data slices have more rows than others, the workers with more data should work harder, longer, and need more resources and time to complete their jobs. These data slices and the workers that manage them become a performance bottleneck for whole data processing task. Uneven distribution of data is called skew and an optimal data distribution has no skew.

Where is a Problem With Data Distribution?

What is data skew

Data skew means that data distribution is uneven or asymmetric. Symmetry means that one half of the distribution is a mirror image of the other half.

Skewed distribution may be different types:

  • left skewed distribution - has a long left tail. Left-skewed distributions are also called negatively-skewed distributions.
  • right skewed distribution - has a long right tail. Right-skewed distributions are also called positive-skew distributions.

Data distributions
Data distributions

Another asymmetric distribution examples: link


How to Find The Skew Problem in Your Data?

Skewness is a measure of symmetry, or more precisely, the lack of symmetry. A distribution, or data set, is symmetric if it looks the same to the left and right of the center point. Skewness for the normal distribution equal zero. This means that the median = mean = moda. Skewness calculates via the formula:

skewness

where:
standard deviation


Data Skew in Distributed Systems

MapReduce is a parallel, distributed computation paradigm. MapReduce allows scaling computations across hundreds and thousands of nodes. This paradigm provides two main operations - map and reduce. The mapping step takes a set of data and converts it into another set of data, where individual elements are broken down into tuples - key/value pairs. The reduce step takes the output from a map and combines those data tuples into a smaller set of tuples. The reduce job is always started after the map job.

mapReduceMapReduce workflow

In distributed, parallel computation systems like Hadoop MapReduce or Apache Spark, uneven data distribution may cause a longer computation time than in the case of symmetric, balanced data distribution.

For example, there are two tables for our e-store:

    customers(customer_id, name, …)
    orders(order_id, customer_id, article, total, …)

And we want to join those two tables to get a full orders list for each customer.

It should be noted that some of the customers are very active (regular customers) and some of them are very passive (purchases are not frequent). For this example, we can say that skewed data is data from orders table and unskewed data is data from customers table. Processing the data via Hadoop MapReduce or Apache Spark leads to a long time task execution.

There is a chart from Spark UI that describes possible computation time results for tasks that work with skewed data:

spark results

From the chart, is visible that all tasks take varying amounts of time to complete, which is to be expected, but the variation is quite large [1].

How to Solve Your Data Distribution Problem?

General Solutions of Data Skew Problem

There are several ways to solve data skew problem. First, we will look at a few common ways of solving, and then consider the solutions provided by various additional services. One of the ideas of solving data skew is splitting a calculation data for a larger number of processors. Also, we can set more partitions for overcrowded columns to reduce access time to data. Below you can see two common solutions for data skew problem at different system layers.

Skewed storing

We can reduce data skew effect at the data uploading stage. The main idea is to clearly point to the skewed data (key) before their partitioning. This will allow the data to be distributed in a different way, which consider a data unevenness. As result, it will reduce the impact of data skew before calculations begin. Because we do not do additional actions during the calculation phase, it reduces the execution time.

Main concepts:

  • can be implemented before processing phase
  • increasing the speed of computing

Repartitioning

By default, all values for a particular key goes to the same mapper. If we see that some key have overcrowded values quantity (order_id), then we can divide it into more than one mapper [2].

Main concepts:

  • can be implemented between processing phases
  • number of mappers equals to number of partitions
  • custom partition strategy could be set

Out of The Box Solutions of Data Skew Problem

Now we will consider ready-made solutions from popular services. We describe data skew solution for two Apache services - Hive and Pig. We also look at the solution for Apache Spark framework.

Apache Hive is a data warehouse software facilitates reading, writing, and managing large datasets residing in distributed storage using SQL. A structure can be projected onto data which are already in the storage.

Skewed table (Hive)

Values that appear very often (heavy skew) are split out into separate files and rest of the values go to some other file [2]. In this case, the skewed table is the orders table.

Example:

CREATE TABLE SkewData (c1 STRING, c2 STRING) 
    SKEWED BY (c1) ON ('value');

Main concepts:

  • automatic splitting into files
  • skipping/including whole file if possible

List bucketing (Hive)

The special type of skewed table. Main idea - maintaining one directory per skewed key. The data corresponding to non-skewed keys goes into separate directory [2].

Example:

CREATE TABLE SkewData (c1 STRING, c2 STRING) 
    SKEWED BY (c1) ON ('value') STORED AS DIRECTORIES;

Main concepts:

  • directory per key
  • for small skewed keys quantities

Apache Pig is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs.

Skewed join (Pig)

Pig’s skewed join can be used when the underlying data is sufficiently skewed and the user needs a finer control over the allocation of reducers to counteract the skew. It should also be used when the data associated with a given key is too large to fit in memory.

Example:

big = LOAD 'big_data' AS (b1,b2,b3);
massive = LOAD 'massive_data' AS (m1,m2,m3);
C = JOIN big BY b1, massive BY m1 USING "skewed";

Main concepts:

  • join tables whose keys are too big to fit in memory
  • do not support more than two tables for skewed join

Apache Spark is an open-source distributed general-purpose cluster-computing framework. Has as its architectural foundation the resilient distributed dataset (RDD), a read-only multiset of data items distributed over a cluster of machines, that is maintained in a fault-tolerant way.

Skewed Join (Spark)

Spark Framework also allows you to use skewed join as well as Hive or Pig solutions. Let's look at an example using Spark SQL. To specify a skewed table, we have to use hints for our SQL queries.

Example:

SELECT /*+ SKEW('orders') */ * 
    FROM orders AS o, customers AS c 
    WHERE o.customer_id = c.customer_id;

Data replication (Spark)

In common case, if we need to join unskewed table to skewed table (and vice versa), we can replicate unskewed table data N times and add new key which represent a value between 0 and N. For skewed table data we add to existing key a random, uniformly distributed value between 0 and N. We are simply splitting the keys so that values associated with the same original key are now split into N buckets [3].

Joining with data replication:

data replication

According to the example above (e-store), unskewed data may be data from the customer’s table and skewed data is data from the orders table. We replicate data from customers table N times and add the new, corresponding key for each replica. Skewed, orders data we don’t need to replicate, but we add a random value between 0 and N to key value. After the task, we can remove this random integer and get the original keys if we need it.

For replicating a small, unskewed dataset, we can use a broadcast hints from Spark SQL or a broadcast function:

SELECT /*+ BROADCASTJOIN(customers) */ o.order_id, c.customer_id 
    FROM orders AS o 
    JOIN customers AS c ON o.customer_id = c.customer_id;

or

val smallHinted = small.hint("broadcast")
val plan = smallHinted.queryExecution.logical

Main concepts:

  • data replication
  • schema modification
  • random bucketing

Practical experiment

Experiments provided on 6 nodes Hadoop cluster 15Gb of operational memory and 3 cores each and Spark 2.3 on it.

We had two datasets, one large and one small, both datasets contains skewed data. Datasets were stored on HDFS.

Our task was to join two datasets on one or more keys which are skewed by default. And our goal was to decrease job execution time by effectively utilizing all available cluster resources.

Run spark job with classical map-reduce data distribution gave as results like this.

exp1

Total time to complete a job - 12 min

As we can see, we have huge data skew on one of the keys (executor 2). As the result, the total duration of the job depends completely from one executor time and parallel execution practically has no effect.

After union repartition of large dataset between all nodes and broadcasting small dataset to each node, we achieve a much better picture.

exp2

Total time to complete a job - 2.7 min

As we can see, now all six executors it calculation process, as the result, we achieve significant effect from parallel execution.

Next step was maximization of utilization of cluster resources. For this purpose we slightly tuned spark-submit command to use more than one core per executor.

exp3

Total time to complete a job - less a 1 min

As a result, all executors has own parallelism level which is determined by the available number of cores and could faster complete task.

Project sources on github: link

Conclusion

Data skew is a real problem for effective parallel computing. The reason of this problem may be a poor data structure or data nature. If we refer to an example with e-store, the problem is in the nature of data. There is no guarantee that all customers will have an even distribution of purchases. Data skew in cases like this is inevitable, so it is possible only to compensate the consequences of this problem, but not eliminate it. As the practical experiment shows, the consequences of data skew are very unpleasant. In fact, eliminating parallelism leads to more execution time and, thankfully, we have a few tools to overcome it.

  1. «Balancing Spark – Bin Packing to Solve Data Skew». Silverpond blog
  2. «Big data skew». Data & Analytics
  3. «Fighting the skew in Spark» Data R Us
  4. «Handling Data Skew Adaptively In Spark Using Dynamic Repartitioning» Databricks
  5. «Handling Data Skew in MapReduce Cluster by Using Partition Tuning». Journal of Healthcare Engineering 2017
  6. «Handling Partitioning Skew in MapReduce using LEEN», Peer-to-Peer Networking and Applications
  7. «Learning from imbalanced data: open challenges and future directions» Bartosz Krawczyk, Progress in Artificial Intelligence, 2016