Parallel Merge Sort in Java
Merge Sort
Merge sort is a sorting algorithm invented in 1945 by John von Neumann. From a time complexity perspective, merge sort is known as an efficient sorting solution as it is O(n log(n)).
The algorithm is part of the divide and conquer family: recursively breaking down a problem into two or more sub-problems.
The overall idea is the following:
mergesort(int[] array, int low, int high) {
if(low < high) {
// Calculate the middle position
int middle = (low + high) / 2 // Apply mergesort on the first half
mergesort(array, low, middle)
// Apply mergesort on the second half
mergesort(array, middle + 1, high) // Merge the two halves
// in order to get an array sorted from low to high
merge(array, low, middle, high)
}
}
The goal of this post is not to describe in details the algorithm as it has already been done many times. Yet, for the scope of our parallel implementation, we must simply understand that we trigger two sub-calls to mergesort()
: one to sort the first half of the array and another one for the second half.
Obviously, in this implementation, nothing is parallelized yet and one thread is going to perform the whole sorting job.
If we need to implement a parallel solution, we have first to limit the number of threads. As merge sort is solely CPU-bound, the most optimal number of threads to manage the execution is something close to the number of CPU cores. Hence, we need a thread pool to manage this execution.
ForkJoinPool
In Java 5, the ExecutorService
was introduced to deal with this challenge.
As an example, let’s create a pool of two threads and submit several tasks:
Yet, in Java 7 we got a new toy to manage thread pools: the so-called fork/join framework and its core class ForkJoinPool
.
The idea of the fork/join pattern is also based on the divide and conquer principle: forking a big task into smaller chunks and aggregating the results.
Despite being based on AbstractExecutorService
(just like ExecutorService
), there is an important difference between both: their internal scheduling algorithm.
ExecutorService
is based on a work-sharing algorithm. Simply said, a queue is shared by all the threads of the pool. Each time a thread is idle, it will do a lookup in this queue to retrieve a task to be done.
On the other hand, ForkJoinPool
is based on a work-stealing algorithm. An idle thread can query other threads of the pool (not directly, in a work-stealing world each thread has also its own local queue) and steal from them a task.
What is the link then between this concept and our initial problem? The overall idea is to change the way we execute the sub-calls to mergesort()
.
Each sub-call could be changed to spawn a new sub-task. The created sub-tasks will be available to other idle threads because of work-stealing. This would result in an equitable distribution of the workload between the different threads of the pool.
In pseudo-code:
mergesort(int[] array, int low, int high) {
if(low < high) {
int middle = (low + high) / 2 // Create two sub-tasks
createTask -> mergesort(array, low, middle)
createTask -> mergesort(array, middle + 1, high) merge(array, low, middle, high);
}
}
In Java, we need to extend RecursiveAction
and to implement the core logic in a compute()
method:
As we can see, we have created two tasks left
and right
and we ask the ForkJoinPool
to execute both. Thanks to work-stealing, these tasks become available to other threads.
Then, we need to create a custom ForkJoinPool
and to ask for executing our parallel implementation:
In the previous example, the ForkJoinPool
was created with numberOfAvailableCores
- 1. This value is also the default number of threads when we use the parallel stream API. Why - 1? Because we need to keep one thread available for performing the fork/join job.
Let’s run a benchmark with an 8-core machine on an array composed of 100k elements to see how good we are:
benchSequential avgt 13,476 ms/op
benchParallel avgt 3775,617 ms/op
Pretty disappointing, isn’t it? The parallel implementation is 290 times slower than the sequential one.
What did we miss?
We remember that the algorithm is dividing the array each time into two halves. If the array is composed of 1024 elements, it means the first execution handles 1024 elements, then the second one 512, then 256 etc. Until the last executions which have to perform a task with only 2 elements.
It means that we will have a bunch of tasks consisting of simply sorting two integers. Those tasks are going to be available for work-stealing which results in causing a huge performance penalty.
Indeed, it is way faster to have one thread performing two times a simple job rather than having two threads synchronizing each other to split and share this job.
Thus, the idea is to fix a limit regarding the number of elements to handle. If we are above this limit, we trigger the parallel execution. If we are below, we just run the sequential method.
Something like this:
What is the best value for MAX
? The Java Arrays
library set this value to 1<<13
which is equals to 8192.
Let’s run another benchmark with the same conditions:
benchSequential avgt 13,476 ms/op
benchParallel avgt 3775,617 ms/opbenchParallelWithLimit avgt 8,75 ms/op
This time, we are 36% faster than the sequential implementation. And the more elements we sort, the better we should be.
Therefore, ForkJoinPool
is a powerful tool for implementing recursive and parallel algorithms but as usual, it has to be used cautiously.