JavaRush/Java блог/Random/Thread'ом Java не испортишь: Часть V — Executor, ThreadPo...
Viacheslav
3 уровень

Thread'ом Java не испортишь: Часть V — Executor, ThreadPool, Fork Join

Статья из группы Random
участников

Вступление

Итак, мы знаем, что в Java есть потоки, о чём можно прочитать в обзоре "Thread'ом Java не испортишь : Часть I - потоки". Thread'ом Java не испортишь: Часть V — Executor, ThreadPool, Fork Join - 1Давайте ещё раз посмотрим на типовой код:
public static void main(String []args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Как мы видим, код для запуска задачи довольно типовой, но на каждый новый запуск нам его придётся повторять. Одно из решений — вынести его в отдельный метод, например, execute(Runnable runnable). Но разработчики Java за нас уже побеспокоились и придумали интерфейс Executor:
public static void main(String []args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Как видно, код стал лаконичнее и позволил нам просто написать код по запуску Runnable в потоке. Здорово, не так ли? Но это только начало: Thread'ом Java не испортишь: Часть V — Executor, ThreadPool, Fork Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Как видно, у интерфейса Executor есть интерфейс-наследник ExecutorService. В JavaDoc данного интерфейса сказано, что ExecutorService является описанием особого Executor'а, который предоставляет методы по остановке работы Executor'а и позволяет получить java.util.concurrent.Future, чтобы отслеживать процесс выполнения. Ранее, в "Thread'ом Java не испортишь : Часть IV — Callable, Future и друзья" мы уже кратко рассмотрели возможности Future. Кто забыл или не читал - советую освежить в памяти ;) Что ещё интересного в JavaDoc написано? Что у нас есть специальная фабрика java.util.concurrent.Executors, которая нам позволяет создавать доступные по умолчанию реализации ExecutorService.

ExecutorService

Ещё раз вспомним. У нас есть Executor для execute (т.е. выполнения) некой задачи в потоке, когда реализация создания потока скрыта от нас. У нас есть ExecutorService — особый Executor, который имеет набор возможностей по управлению ходом выполнения. И у нас есть фабрика Executors, которая позволяет создавать ExecutorService. Давайте теперь это проделаем сами:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Как мы видим, мы указали фиксированный пул потоков (Fixed Thread Pool) размером 2. После чего мы поочередно отправляем в пул задачи. Каждая задача возвращает строку (String), содержащую имя потока (currentThread().getName()). Важно в самом конце выполнить shutdown для ExecutorService, потому что в противном случае наша программа не завершится. В фабрике Executors есть и другие фабричные методы. Например, мы можем создать пул всего из одного потока — newSingleThreadExecutor или пул с кэшированием newCachedThreadPool, когда потоки будут убираться из пула, если они простаивают 1 минуту. На самом деле, за этими ExecutorService прячется блокирующая очередь, в которую помещаются задачи и из которой эти задачи выполняются. Подробнее про блокирующие очереди можно посмотреть в видео "Блокирующая очередь - Collections #5 - Advanced Java". А так же можно прочитать обзор "Блокирующие очереди пакета concurrent" и ответ на вопрос "When to prefer LinkedBlockingQueue over ArrayBlockingQueue?". Супер упрощённо — BlockingQueue (блокирующая очередь) блокирует поток, в двух случаях:
  • поток пытается получить элементы из пустой очереди
  • поток пытается положить элементы в полную очередь
Если посмотреть на реализацию фабричных методов, то мы увидим, как они устроены. Например:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
или
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Как мы видим, внутри фабричных методов создаются реализации ExecutorService. И в основном это ThreadPoolExecutor. Меняются только атрибуты, которые и влияют на работу. Thread'ом Java не испортишь: Часть V — Executor, ThreadPool, Fork Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

Как мы ранее увидели, внутри фабричных методов в основном создаётся ThreadPoolExecutor. На функциональность влияет то, какие значения переданы в качестве максимума и минимума потоков, а также какая очередь используется. А использоваться может любая реализация интерфейса java.util.concurrent.BlockingQueue. Говоря о ThreadPoolExecutor'ах, стоит отметить интересные особенности при работе. Например, нельзя посылать задачи в ThreadPoolExecutor, если там нет места:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Данный код упадёт с ошибкой вида:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
То есть task нельзя засабмитить, т.к. SynchronousQueue устроена так, что фактически состоит из одного элемента и не позволяет положить туда больше. Как мы видим, queued tasks здесь 0, и в этом нет ничего странного, т.к. это специфика SynchronousQueue — фактически это очередь в 1 элемент, которая всегда пустая. (!) Когда один поток кладёт в очередь элемент, он будет ждать, пока другой поток не заберёт элемент из очереди. Поэтому, мы можем заменить на new LinkedBlockingQueue<>(1) и в ошибке изменится то, что будет указано queued tasks = 1. Т.к. очередь всего в 1 элемент, то второй мы уже положить не можем. И упадём на этом. Продолжая тему очереди, стоит отметить, что класс ThreadPoolExecutor обладает дополнительными методами по обслуживанию очереди. Например, метод threadPoolExecutor.purge() удалит из очереди все отменённые задачи, чтобы освободить место в очереди. Ещё одной интересной функцией, связанной с очередью, является обработчик непринятых задач:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Для примера обработчик просто выводит слово Rejected на каждый отказ принимать задачу в очередь. Удобно, не правда ли? Кроме того, ThreadPoolExecutor имеет интересного наследника — ScheduledThreadPoolExecutor, который является ScheduledExecutorService. Он предоставляет возможность выполнять задачу по таймеру.

ScheduledExecutorService

ExecutorService типа ScheduledExecutorService позволяют запускать задачи по расписанию (schedule). Посмотрим на пример:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Тут всё просто. Отправляются задачи, получаем "запланированную задачу" java.util.concurrent.ScheduledFuture. С расписанием может быть полезен также и следующий случай:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Здесь мы отправляем Runnable задачу на выполнение с фиксированной частотой (Fixed Rate) с определённой задержкой. В данном случае, через 1 секунду каждые 2 секунды начать выполнять задачу. Есть похожий вариант:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Но здесь задачи выполняются с заданным промежутком МЕЖДУ выполнением разных задач. То есть задача task будет выполнена через 1 секунду. Далее, как только она будет завершена, пройдёт 2 секунды, и тогда новая задача task будет запущена. По данной теме можно прочитать следующие материалы: Thread'ом Java не испортишь: Часть V — Executor, ThreadPool, Fork Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

Помимо указанных выше пулов потоков, есть ещё один. Можно сказать, что он немного особенный. Имя ему — Work Stealing Pool. Если кратко, то Work Stealing — это такой алгоритм работы, при котором простаивающие потоки начинают забирать задачи других потоков или задачи из общей очереди. Посмотрим на пример:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Если мы запустим данный код, то ExecutorService нам создаст 5 потоков, т.к. каждый поток будет вставать в wait очередь по локу объекта lock. Про мониторы и локи по нему мы уже ранее разбирались в "Thread'ом Java не испортишь: Часть II — синхронизация". А теперь мы заменим Executors.newCachedThreadPool на Executors.newWorkStealingPool(). Что поменяется? Мы увидим, что наши задачи выполняются не в 5 потоков, а меньше. Помните, что cachedThreadPool создавал на каждую задачу свой поток? Потому что wait блокировал поток, а следующие задачи хотели выполнятся и в пуле для них создавались новые потоки. В случае со StealingPool потоки не будут вечно простаивать в wait, они начнут выполнять соседние задачи. Чем так отличается от остальных тредпулов этот WorkStealingPool? Тем, что внутри него живёт на самом деле волшебный ForkJoinPool:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
На самом деле есть ещё одно отличие. Потоки, которые создаются для ForkJoinPool по умолчанию являются демон потоками, в отличие от потоков, созданных через обычный ThreadPool. Вообще, стоит помнить про демон-потоки, т.к. например при CompletableFuture тоже используются демон-потоки, если не указать свою ThreadFactory, которая будет создавать не демон-потоки. Вот такие вот сюрпризы могут ждать в неожиданном месте!)

Fork/Join Pool

В этой части мы поговорим про тот самый ForkJoinPool (его ещё называют fork/join framework), который живёт "под капотом" у WorkStealingPool. Вообще, Fork Join Framework появился ещё в Java 1.7. И пусть уже Java 11 на дворе, но вспомнить всё равно стоит. Не самая распространённая задача, но довольно интересная. На просторах сети есть хороший обзор на эту тему: "Fork/Join Framework в Java 7". Fork/JoinPool оперирует в своей работе таким понятием как java.util.concurrent.RecursiveTask. Также есть аналог — java.util.concurrent.RecursiveAction. RecursiveAction не возвращают результат. Таким образом RecursiveTask похож на Callable, а RecursiveAction похож на Runnable. Ну и смотря на название мы видим два ключевых метода — fork и join. Метод fork запускает асинхронно в отдельном потоке некоторую задачу. А метод join позволяет дождаться завершения выполнения работы. Существует несколько способов использования: Thread'ом Java не испортишь: Часть V — Executor, ThreadPool, Fork Join - 5Данный картинка — это часть слайда доклада Алексея Шипилёва "Fork/Join: реализация, использование, производительность". Чтобы стало понятнее, стоит посмотреть его доклад на JEE CONF: "Fork Join особенности реализации".

Подведение итогов

Итак, вот мы и закончили очередную часть обзора. Мы разобрались, что сначала придумали Executor для выполнения потоков. Потом решили продолжить идею и придумали ExecutorService. ExecutorService позволяет отправлять задачи на выполнение при помощи submit и invoke, а также управлять сервисом, выключая его. Т.к. ExecutorService'у нужны реализации, написали класс с фабричными методами и назвали его Executors. Он позволяет создавать пулы потоков ThreadPoolExecutor'ы. При этом существуют такие пулы потоков, которые позволяют ещё и указать расписание для выполнения, а за WorkStealingPool прячется ForkJoinPool. Надеюсь, Вам было не только интересно выше написанное, но и понятно ) Всегда рад предложениям и замечаниям. #Viacheslav
Комментарии (39)
  • популярные
  • новые
  • старые
Для того, чтобы оставить комментарий Вы должны авторизоваться
27 августа 2023, 17:23
Брайан Гоетц java concurrency на практике, для меня понятнее материал изложен там.
Юлия
Уровень 41
8 мая 2023, 07:26
обо всем и ни о чем
NacWay
Уровень 37
12 апреля 2023, 08:57
Забавно, что следующий уровень начинается с темы "switch, case, default" 😄
Александр
Уровень 40
2 июня 2023, 13:54
в голос поржал)
Vlad
Уровень 30
7 августа 2023, 13:03
😆
mgg
Уровень 47
4 апреля 2023, 23:46
не знаю чем руководствовался автор статей, когда писал их для аудитории, которая пришла сюда с нуля изучать язык. Подход не системный, не логичный, с нуля что-то по серии данных статей изучить крайне сложно. Автор очень любит отсылки, и мне тоже хотелось написать в этом же ключе, чтобы автор ознакомился с подачей материала Java Syntax Zero и сделал рефакторинг, и тогда возможно статьи станут действительно ценным источником для обучения. А пока это просто что-то вроде пометок на полях в конспекте.
milniy87
Уровень 41
8 февраля 2023, 15:52
Странная последовательность изложения материала. Вначале рассказать про CompletableFuture, а потом про ExecutorService. Хотя в каком то смысле CompletableFuture это логичное продолжение ExecutorService. Ну опять же чтоб познать путь самурая нужно выполнить все побочные квесты.
CyberBoar
Уровень 39
23 июля 2022, 11:43
Чисто как китайская грамота. Отдельные слова понятные, некоторые предложения. А всё в месте понимания ни на грамм не прибавляет. Как это использовать на практике, в каких целях - а хрен его знает. Выглядит как конспект, который человек скорее для себя делал, как шпаргалку на случай, если что забудет. В любом случае спасибо, может в дальнейшем пригодится.
milniy87
Уровень 41
8 февраля 2023, 15:37
Неплохая статья, но для ее понимания нужно выполнить все побочные квесты прочитать все материалы по всем ссылкам что давалось как дополнение. И как следствие тема недостаточно разжёвана в статье тема раскрыта не полностью.
Алексей С
Уровень 33
19 июня 2022, 20:24
Думаю надо пару видео посмотреть чтобы в это вникнуть. От этих новых терминов голова идет кругом.
Amikuto
Уровень 11
29 марта 2022, 21:36
Данный материал скорее подходит для мидлов, которые все это уже давно изучали, но немного подзабыли и решили "освежить" память. Но точно никак не для тех, кто пришел сюда с 0 знаниями о экзекютерах, пулах и форкджоинах
Жора Нет
Уровень 39
12 марта 2022, 10:45
Еле дочитал, сей "доклад". Нет последовательности подачи информации. Все смешалось в кучу. Неужели нельзя было сделать последовательность ссылок? Сначала прочтите про функциональные интерфейсы, потом про лямбды, потом про стримы и т.д. А то дали кучу ссылок - читайте! Заходишь туда, ага стримы, начинаешь читать, а там лямбды, о которых ты только слышал, но не понимаешь как работают! В общем куча всего подряд...
Петрович
Уровень 36
4 января 2023, 14:12
я кстати начал понимать лямбды, как начал параллельно учить JavaScript...и как работает reduce в стримах
hidden #2595317
Уровень 45
17 октября 2021, 11:17
Данный картинка.))
Hamlet simonyan
Уровень 34
14 февраля 2022, 17:07
Я тоже заметил)