Introduction
In thread pool executions, tasks are continually fetched from the task queue and executed. If we want to achieve delayed or scheduled task execution, an important point to note is that the task queue sorts tasks based on their delay times. Tasks with shorter delays are placed at the front of the queue and executed first.
While queues generally follow a first-in-first-out (FIFO) data structure, where the earliest data to enter into the queue is popped first, there exists a particular type of queue called a priority queue. A priority queue sorts the data inserted based on priority, ensuring that higher-priority data is retrieved first, irrespective of the insertion order.
ScheduledThreadPoolExecutor
The ScheduledThreadPoolExecutor extends the ThreadPoolExecutor and also the ScheduledExecutorService interface (but ignore that for now).
Its internal data structures are essentially the same as ThreadPoolExecutor. In addition, it includes a functionality that allows tasks to be scheduled for execution by time, both for delayed execution tasks and periodic tasks.
The constructor of ScheduledThreadPoolExecutor only accepts 3 arguments:
corePoolSize - the number of threads allowed in the pool.
ThreadFactory - factory for the thread creation.
RejectedExecutionHandler - callback for the thread is blocked or rejected when capacity is reached. Below are some of the other defaults.
The work task queue is a highly customized delayed blocking queue named DelayedWorkQueue. Its implementation principle is essentially the same as that of DelayQueue. The core data structure is a priority queue based on a binary minimum heap. When the queue is full, it will automatically expand, ensuring the offer operation will never be blocked. Therefore, maximumPoolSize is not needed, and the thread pool will always keep most corePoolSize working threads running.
The DelayedWorkQueue is also a delay queue designed for timed tasks. It works similarly to DelayQueue, but with the implementation process of the priority queue and DelayQueue migrated to its method body. This allows it to flexibly include method calls unique to timed tasks.
Implementation Details
The reason ScheduledThreadPoolExecutor implements its blocking work queue is that the requirements for its work queue are somewhat unique.
DelayedWorkQueue is a heap-based data structure, similar to DelayQueue and PriorityQueue. When executing timed tasks, the execution time for each task varies. So, the job of DelayedWorkQueue is to arrange tasks in ascending order of execution time, with tasks closer to the current time at the front of the queue (Note: The order here is not absolute. The heap sort only guarantees that the next execution time of child nodes will be greater than that of parent nodes, but the order between leaf nodes is not necessarily sequential).
The heap structure is illustrated in the following diagram:
Here, as you can see is a queue w.r.t the minimum heap (will go more detailed below).
In this structure, we can observe the following features: Assuming the "first element" in the array has an index of 0, then the relationship between the parent node and child nodes' position is as follows:
The index of the left child of the node at index i is (2*i + 1).
The index of the right child of the node at index i is (2*i + 2).
The index of the parent node of the node at index i is floor((i-1)/2).
So, Why use DelayedWorkQueue?
When executing a timed task, the task that is closest to execution needs to be fetched. Therefore, every time a task is dequeued, it must be the task with the earliest execution time in the current queue, necessitating the use of a priority queue.
Here is a snippet from the code.
In essence, it is a priority queue that ensures that each dequeued task has the earliest execution time in the current queue. Since it's based on a heap structure, the worst-case time complexity for insert and delete operations is O(logN).
Some more details related to the code structure.
// initial length of queue
private static final int INITIAL_CAPACITY = 16;
// RunnableScheduledFuture Array to store the elements in the queue.
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
// Using this lock for concurrency stuff
private final ReentrantLock lock = new ReentrantLock();
// size of the elements stored in the queue
private int size = 0;
// leader thread where the task at the head of the queue is located
private Thread leader = null;
// When the task delay at the head of the queue is up, or
// new thread need to become the leader, wake up the waiting thread
private final Condition available = lock.newCondition();
Above, you can see the DelayedWorkQueue class diagram.
Here, DelayedWorkQueue uses an array to store the elements in the queue. The core data structure is the Min heap. When the queue is full, it will automatically expand.
The important thing to keep in mind is, Leader-Follower pattern,
rather a variant of that is used to reduce unnecessary timing waits. What does that mean?
In multi-threaded network models, every thread adopts one of three roles: leader, follower, or processor. The fundamental principle is to always have at most one leader, with all other followers waiting their turn to become the leader. When the thread pool starts, it automatically designates a leader to await network I/O events. When an event occurs, the leader thread alerts a follower thread to take over as the new leader. The original leader then starts processing the network event. After the task is complete, the thread joins the queue of followers, awaiting its turn to be the leader again. This approach enhances CPU cache efficiency and eliminates the need for dynamic memory allocation and data exchange between threads.\
More can be read from the Java API code documentation
Methods
Now let’s take a look at some of the crucial methods implemented within the class.
» Enqueue
If you look closer, DelayedWorkQueue offers three methods to insert elements: put(), add(), and offer(). All three of these methods call the same offer() method. This is because there's no "queue full" condition in DelayedWorkQueue - it can accept elements indefinitely. If the number of elements exceeds the length of the array, the array will just expand.
» offer() Method
Offer method inserts elements into the queue.
The ScheduledThreadPoolExecutor calls DelayedWorkQueue.add() when submitting a task, and this method (along with its sibling insertion methods) call offer() implicitly.
offer() method break down:
1. It acts as the user’s entry point and first acquires the lock.
2. The method checks if the queue is full (size >= queue.length). If it is, it expands the queue by calling grow().
3. If the queue isn't full, size is incremented by one.
4. The method checks whether the added element is the first one. If it is, there's no need to heapify.
5. If the added element isn't the first one, it needs to be heapified using siftUp().
6. If the element at the top of the heap is the one just added, it signals the take() thread to consume.
7. Finally, it releases the lock.
This way, DelayedWorkQueue efficiently manages the insertion of tasks, ensuring that tasks are executed according to their set delays and priorities.
» grow() method
Grows the task array.
When the queue becomes full, the DelayedWorkQueue doesn't wait or block. Instead, it continues to expand the queue. The new capacity (newCapacity) is 50% larger than the old capacity (oldCapacity). This is achieved by oldCapacity >> 1, which is equivalent to dividing oldCapacity by 2. Finally, it creates a new empty array with newCapacity and copies the old array's data into the new array using Arrays.copyOf.
» siftUp() method
Allows Heapifying upward when a new element is added
When a new element is added, it's initially placed at the bottom of the heap. It then sifts up and compares itself with its parent node. If it's less than the parent node, it swaps places with the parent. This comparison cycle continues until the element is larger than its parent, at which point the cycle ends. This allows the element’s position in the heap to be determined (or heapified).
Here's the detailed process of siftUp:
Breakdown
The core idea is that it continually compares the key node with its parent. If the key node's execution time is less than the parent's, the nodes are swapped. This ensures nodes with earlier execution times are at the front of the queue.
As you can see, every time a new node is added, it's only compared with its parent node, not affecting any sibling nodes.
» dequeue() method
DelayedWorkQueue provides several methods to remove elements from the queue:
take()
: Waits to retrieve the head element of the queue.
poll()
: Immediately retrieves the head element of the queue.
poll(long timeout, TimeUnit unit)
: Waits for a specified time to retrieve the head element of the queue.
» take() method
Once the worker thread is started, it continuously consumes elements from the work queue. As the keepAliveTime
of ScheduledThreadPoolExecutor
is 0, the consumption of tasks simply calls DelayedWorkQueue.take()
.
I noticed, take()
method will block under two conditions:
The top heap element is null (empty queue).
The
delay
of the top heap element is greater than 0.
In ThreadPoolExecutor, the getTask() method has worker threads continually retrieve tasks from the work queue. However, with timed tasks, the task might start executing as soon as the getTask
method retrieves it, even though the execution time has not arrived yet. Hence, within the take
method, we ensure the task can only be taken when the specified execution time arrives.
The Role of the Leader Thread
The role of the leader thread is to reduce unnecessary timed waits. This design is a variant of the Leader-Follower pattern, aimed at avoiding unnecessary time waiting. When a take thread becomes the leader, it only needs to wait for the next delay time. Other take threads, which aren't the leader, need to wait for the leader thread to dequeue before they are awakened.
For example, if there's no leader, all threads executing
take
would execute available.awaitNanos(delay). Suppose the current thread executed this code, but no signal has been issued. A second thread also executes this code, and thus the second thread is also blocked. It's futile for multiple threads to execute this code at the same time because only one thread can return queue[0] fromtake
(due to the lock). By the time other threads return to the for loop to retrieve queue[0], it's no longer the original queue[0], and they have to continue blocking.
Therefore, to prevent multiple threads from frequently making useless timed waits, the leader is added. If the leader is not null, it means that the first node in the queue is already waiting to be dequeued. In this case, other threads will remain blocked, reducing unnecessary blocks. (Note: in the finally clause, signal() is used to wake up a thread, not signalAll().)
The DelayedWorkQueue in Java ensures that tasks added to the queue are organized according to the delay time. The one with the smallest delay gets executed first. The underlying data structure for DelayedWorkQueue is a heap, which is implemented via an array.
» finishPoll() method
The finishPoll(), involves dequeuing the top heap element when its delay reaches zero or less. This involves a process known as sifting down.
The method consists of three steps:
Decrement the queue size.
Set the last element of the original queue as the head element and the last element to null.
3. Call the siftDown(0, x) method to ensure ordering based on element priority.
» siftDown() method
After dequeuing the top heap element, the heap's structure gets disrupted. To rectify this, the heap's last element is moved to the top, and then the heap is sifted down. This process comprises a loop that continues until the parent node is less than or equal to the smaller child node.
This method has two scenarios: when there are no child nodes and when there are child nodes (judged by half). It doesn't always produce an ordered result, but it ensures that the child node's execution time is greater than the parent node's. Thus, when take() or poll() is invoked, the dequeuing operation is orderly.
» poll() method
The poll() method immediately gets the head element of the queue. If the head task is null or the task delay time hasn't arrived, it indicates the task cannot return, so it directly returns null. Otherwise, the finishPoll() method is called, removing the queue head element and returning it.
The poll(long timeout, TimeUnit unit) method is a timeout version of the poll(). It waits for the head of the queue to be fetched for the duration of the timeout. If the timeout is reached and no task is available, it returns null.
» Removing elements
The remove method is typically used to cancel tasks that are still in the blocking queue, which necessitates their removal. When the removed element isn't the last heap element, heapification must be done.
Conclusion
To conclude, DelayedWorkQueue uses a min heap to ensure that the tasks added to the queue are sorted according to their delay time.
It never blocks data production, but can block consumption, making it a producer-consumer model.
It also includes a leader thread variable, which is a variant of the Leader-Follower pattern as described above.
When a take thread becomes a leader thread, it only waits for the next delay time, whereas other take threads led by the leader thread wait for the leader thread to dequeue before awakening.