FutureTask

The Future interface and the FutureTask class that implements the Future interface represent the result of asynchronous computations. The implementation of FutureTask is based on AbstractQueuedSynchronizer (hereinafter referred to as AQS). Many of the blockable classes in java.util.concurrent, such as ReentrantLock, are implemented based on AQS. AQS is a synchronization framework that provides a common mechanism for atomically managing synchronization state, blocking and waking threads, and maintaining queues of blocked threads. AQS is widely used in JDK 6, and synchronizers based on AQS implementation include: ReentrantLock, Semaphore, ReentrantReadWriteLock, CountDownLatch, and FutureTask. Each synchronizer based on an AQS implementation will contain two types of operations, as follows.

  • At least one acquire operation. This operation blocks the calling thread unless /until the state of AQS allows the thread to continue execution. FutureTask’s acquire operation is a get()/get(long timeout, TimeUnit unit) method call.
  • At least one release operation. This operation changes the state of AQS, which allows one or more blocking threads to be unblocked. FutureTask’s release operations include the run() method and cancel(… method. Based on the principle of “composite takes precedence over inheritance”, FutureTask declares an internally private subclass that inherits from AQS, Sync, to which all calls to futureTask’s public methods are delegated. Sync is an internal private class of FutureTask that inherits from AQS. When you create a FutureTask, an internally private member object, Sync, is created, and all of FutureTask’s public methods are delegated directly to the internally private Sync. The FutureTask.get() method calls the AQS.acquireSharedInterruptibly(int arg) method, which is executed as follows.

1) Call the AQS.acquireSharedInterruptibly(int arg) method, which first calls back the tryAcquireShared() method implemented in the subclass Sync to determine whether the acquire operation can succeed. The condition that an acquire operation can succeed is that state is executed in a finish state RAN or a canceled state CANCELLED, and runner is not null.

2) If successful, the get() method returns immediately. If it fails, go to the thread wait queue to wait for other threads to perform the release operation.

3) When other threads perform release operations (such as FutureTask.run() or FutureTask.cancel(…) )) After waking up the current thread, the current thread performing tryAcquireShared() again will return a positive 1, and the current thread will leave the thread waiting for the queue and wake up its successor thread (which will have a cascading wake-up effect here, which will be described later).

4) Finally return the result of the calculation or throw an exception.

The execution process of FutureTask.run() is as follows.

1) Perform the task specified in the constructor (Callable.call()).

2) Update the synchronization state atomically (call AQS.compareAndSetState(int expect, int update), set state to execute completion state RAN). If the atomic operation is successful, set the value of the variable result representing the result of the calculation to the return value of Callable.call(), and then call AQS.releaseShared(int arg).

3) AQS.releaseShared(int arg) will first call back to tryReleaseShared(arg) implemented in the subclass Sync to perform the release operation (setting the thread running the task runner to null, which will return true); AQS.releaseShared(int arg), then wakes up the thread waiting for the first thread in the queue.

4) Call FutureTask.done(). When executing the FutureTask.get() method, if FutureTask is not in the execution completion state RAN or canceled, the current thread of execution will wait in the thread wait queue of AQS. When a thread executes the FutureTask.run() method or FutureTask.cancel(… ) method, the thread is woken up waiting for the first thread in the queue

When thread E executes the run() method, it wakes up the first thread A in the queue. After thread A is woken up, it first removes itself from the queue, then wakes up its successor, thread B, and finally thread A returns from the get() method. Threads B, C, and D repeat the processing flow of thread A. Eventually, all threads waiting in the queue are cascaded to wake up and returned from the get() method.