Вступление

Итак, мы знаем, что в Java есть потоки, о чём можно прочитать в обзоре "Thread'ом Java не испортишь : Часть I - потоки". Давайте ещё раз посмотрим на типовой код:
public static void main(String []args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Как мы видим, код для запуска задачи довольно типовой, но на каждый новый запуск нам его придётся повторять. Одно из решений — вынести его в отдельный метод, например execute(Runnable runnable). Но разработчики Java за нас уже побеспокоились и придумали интерфейс Executor:
public static void main(String []args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Как видно, код стал лаконичнее и позволил нам просто написать код по запуску Runnable в потоке. Здорово, не правда ли? Но это - только начало:
Как видно, у интерфейса Executor есть интерфейс-наследник ExecutorService. Как сказано в JavaDoc данного интерфейса, ExecutorService является описанием особого Executor'а, который предоставляет методы по остановке работы Executor'а и позволяет получить java.util.concurrent.Future, чтобы отслеживать процесс выполнения. Ранее, в "Thread'ом Java не испортишь : Часть IV — Callable, Future и друзья" мы уже кратко рассмотрели возможности Future. Кто забыл или не читал - советую освежить в памяти ;) Что ещё интересного в JavaDoc написано? Там написано, что у нас есть специальная фабрика java.util.concurrent.Executors, которая нам позволяет создавать доступные по умолчанию реализации ExecutorService.

ExecutorService

Ещё раз вспомним. У нас есть Executor для execute (т.е. выполнения) некой задачи в потоке, когда реализация создания потока скрыта от нас. У нас есть ExecutorService — особый Executor, который имеет набор возможностей по управлению ходом выполнения. И у нас есть фабрика Executors, которая позволяет создавать ExecutorService. Давайте теперь это проделаем сами:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Как мы видим, мы указали фиксированный пул потоков (Fixed Thread Pool) размером 2. После чего мы поочередно отправляем в пул задачи. Каждая задача возвращает строку (String), содержащую имя потока (currentThread().getName()). Важно в самом конце выполнить shutdown для ExecutorService, потому что в противном случае наша программа не завершится. В фабрике Executors есть и другие фабричные методы. Например, мы можем создать пул всего из одного потока — newSingleThreadExecutor или пул с кэшированием newCachedThreadPool, когда потоки будут убираться из пула, если они простаивают 1 минуту. На самом деле, за этими ExecutorService прячется блокирующая очередь, в которую помещаются задачи и из которой эти задачи выполняются. Подробнее про блокирующие очереди можно посмотреть в видео "Блокирующая очередь - Collections #5 - Advanced Java". А так же можно прочитать обзор "Блокирующие очереди пакета concurrent" и ответ на вопрос "When to prefer LinkedBlockingQueue over ArrayBlockingQueue?". Супер упрощённо - BlockingQueue (блокирующая очередь) блокирует поток, в двух случаях:
  • поток пытается получить элементы из пустой очереди
  • поток пытается положить элементы в полную очередь
Если посмотреть на реализацию фабричных методов, то мы увидим, как они устроены. Например:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
или
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Как мы видим, внутри фабричных методов создаются реализации ExecutorService. И в основном это ThreadPoolExecutor. Меняются только атрибуты, которые и влияют на работу.

ThreadPoolExecutor

Как мы ранее увидели, внутри фабричных методов в основном создаётся ThreadPoolExecutor. На функциональность влияет то, какие значения переданы в качестве максимума и минимума потоков, а так же какая очередь используется. А использоваться может любая реализация интерфейса java.util.concurrent.BlockingQueue. Говоря о ThreadPoolExecutor'ах стоит отметить интересные особенности при работе. Например, нельзя посылать задачи в ThreadPoolExecutor, если там нет места:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Данный код упадёт с ошибкой вида:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
То есть task нельзя засабмитить, т.к. SynchronousQueue устроена так, что фактически состоит из одного элемента и не позволяет положить туда больше. Как мы видим queued tasks здесь 0 и в этом нет ничего странного, т.к. это специфика SynchronousQueue — фактически эта очередь в 1 элемент, которая всегда пустая. (!) Когда один поток кладёт в очередь элемент он будет ждать, пока другой поток не заберёт элемент из очереди. Поэтому, мы можем заменить на new LinkedBlockingQueue<>(1) и в ошибке изменится то, что будет указано queued tasks = 1. Т.к. очередь всего в 1 элемент, то второй мы уже положить не можем. И упадём на этом. Продолжая тему очереди стоит отметить, что класс ThreadPoolExecutor обладает дополнительными методами по обслуживанию очереди. Например, метод threadPoolExecutor.purge() удалит из очереди все отменённые задачи, чтобы освободить место в очереди. Ещё одной интересной функцией, связанной с очередью, является обработчик непринятых задач:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Для примера обработчик просто выводит слово Rejected на каждый отказ принимать задачу в очередь. Удобно, не правда ли? Кроме того, ThreadPoolExecutor имеет интересного наследника — ScheduledThreadPoolExecutor, который является ScheduledExecutorService. Он предоставляет возможность выполнять задачу по таймеру.

ScheduledExecutorService

ExecutorService типа ScheduledExecutorService позволяют запускать задачи по расписанию (schedule). Посмотрим на пример:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Тут всё просто. Отправляются задачи, получаем "запланированную задачу" java.util.concurrent.ScheduledFuture. С расписанием могут быть полезны так же и следующий случай:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Здесь мы отправляем Runnable задачу на выполнение с фиксированной частотой (Fixed Rate) с определённой задержкой. В данном случае, через 1 секунду каждые 2 секунды начать выполнять задачу. Есть похожий вариант:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Но здесь задачи выполняются с заданным промежутком МЕЖДУ выполнением разных задач. То есть задача task будет выполнена через 1 секунды. Далее, как только она будет завершена, пройдёт 2 секунды и тогда новая задача task будет запущена. По данной теме можно прочитать следующие материалы:

WorkStealingPool

Помимо указанных выше пулов потоков есть ещё один. Можно сказать, что он немного особенный. Имя ему — Work Stealing Pool. Если кратко, то Work Stealing — это такой алгоритм работы, при котором простаивающие потоки начинают забирать задачи других потоков или задачи из общей очереди. Посмотрим на пример:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Если мы запустим данный код, то ExecutorService нам создаст 5 потоков, т.к. каждый поток будет вставать в wait очередь по локу объекта lock. Про мониторы и локи по нему мы уже ранее разбирались в "Thread'ом Java не испортишь: Часть II — синхронизация". А теперь мы заменим Executors.newCachedThreadPool на Executors.newWorkStealingPool(). Что поменяется? Мы увидим, что наши задачи выполняются не в 5 потоков, а меньше. Помните, что cachedThreadPool создавал на каждую задачу свой поток? Потому что wait блокировал поток, а следующие задачи хотели выполнятся и в пуле для них создавались новые потоки. В случае со StealingPool потоки не будут вечно простаивать в wait, они начнут выполнять соседние задачи. Чем так отличается от остальных тредпулов этот WorkStealingPool? Тем, что внутри него живёт на самом деле волшебный Fork Join Pool:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
На самом деле есть ещё одно отличие. Потоки, которые создаются для Fork Join Pool по умолчанию являются демон потоками, в отличии от потоков, созданных через обычный ThreadPool. Вообще, стоит помнить про демон потоки, т.к. например при CompletableFuture тоже используются демон потоки, если не указать свою ThreadFactory, которая будет создавать не демон потоки. Вот такие вот сюрпризы могут ждать в неожиданном месте!)

Fork/Join Pool

В этой части мы поговорим про тот самый Fork Join Pool (его ещё называют fork/join framework), который живёт "под капотом" у WorkStealingPool. Вообще, Fork Join Framework появился ещё в Java 1.7. И пусть уже Java 11 на дворе, но вспомнить всё равно стоит. Не самая распространённая задача, но довольно интересная. На просторах сети есть хороший обзор на эту тему: "Fork/Join Framework в Java 7". Fork/Join Pool оперирует в своей работе таким понятием, как java.util.concurrent.RecursiveTask. Так же есть аналог — java.util.concurrent.RecursiveAction. RecursiveAction не возвращают результат. Таким образом RecursiveTask похож на Callable, а RecursiveAction похож на Runnable. Ну и смотря на название мы видим два ключевых метода — fork и join. Метод fork запускает асинхронно в отдельном потоке некоторую задачу. А метод join позволяет дождаться завершения выполнения работы. Существует несколько способов использования:
Данный картинка — это часть слайда доклада Алексея Шипилёва "Fork/Join: реализация, использование, производительность". Чтобы стало понятнее, стоит посмотреть его доклад на JEE CONF: "Fork Join особенности реализации":

Подведение итогов

Итак, вот мы и закончили очередную часть обзора. Мы разобрались, что сначала придумали Executor для выполнения потоков. Потом решили продолжить идею и придумали ExecutorService. ExecutorService позволяет отправлять задачи на выполнение при помощи submit и invoke, а так же управлять сервисом, выключая его. Т.к. ExecutorService'у нужны реализации, то для этого написали класс с фабричными методами и назвали его Executors. Он позволяет создавать пулы потоков ThreadPoolExecutor'ы. При этом существуют такие пулы потоков, которые позволяют ещё и указать расписание для выполнения, а за WorkStealingPool прячется ForkJoinPool. Надеюсь, Вам было не только интересно выше написанное, но и понятно ) Всегда рад предложениям и замечаниям. #Viacheslav
Что еще почитать:

Thread'ом Java не испортишь: Часть I — потоки

Thread'ом Java не испортишь: Часть II — синхронизация

Thread'ом Java не испортишь : Часть III - взаимодействие

Thread'ом Java не испортишь : Часть IV - Callable, Future и друзья

Thread'ом Java не испортишь: Часть VI - К барьеру!