Thread'ом Java не испортишь: Часть VI - К барьеру! - 1

Вступление

Потоки — штука интересная. В прошлых обзорах мы рассмотрели некоторые доступные средства для реализации многопоточности. Давайте посмотрим, что мы можем сделать ещё интересного. К этому моменту мы многое что знаем. Например, из "Thread'ом Java не испортишь: Часть I — потоки" мы знаем, что поток — это Thread. Мы знаем, что поток выполняет некоторую задачу. Если мы хотим, чтобы нашу задачи могли запустить (run), то мы должны указать потоку некий Runnable. Чтобы вспомнить, можем воспользоваться Tutorialspoint Java Online Compiler'ом:
public static void main(String []args){
	Runnable task = () -> {
 		Thread thread = Thread.currentThread();
		System.out.println("Hello from " + thread.getName());
	};
	Thread thread = new Thread(task);
	thread.start();
}
Так же мы знаем о том, что у нас есть такое понятие, как лок. О нем мы читали в "Thread'ом Java не испортишь: Часть II — синхронизация". Поток может занимать лок и тогда другой поток, который попытается занять лок, будет вынужден ждать освобождения лока:
import java.util.concurrent.locks.*;

public class HelloWorld{
	public static void main(String []args){
		Lock lock = new ReentrantLock();
		Runnable task = () -> {
			lock.lock();
			Thread thread = Thread.currentThread();
			System.out.println("Hello from " + thread.getName());
			lock.unlock();
		};
		Thread thread = new Thread(task);
		thread.start();
	}
}
Думаю, пора поговорить о том, что мы ещё можем интересное сделать.
Thread'ом Java не испортишь: Часть VI - К барьеру! - 2

Семафоры

Самое простое средство контроля за тем, сколько потоков могут одновременно работать — семафор. Как на железной дороге. Горит зелёный — можно. Горит красный — ждём. Что мы ждём от семафора? Разрешения. Разрешение на английском — permit. Чтобы получить разрешение — его нужно получить, что на английском будет acquire. А когда разрешение больше не нужно мы его должны отдать, то есть освободить его или избавится от него, что на английском будет release. Посмотрим, как это работает. Нам потребуется импорт класса java.util.concurrent.Semaphore. Пример:
public static void main(String[] args) throws InterruptedException {
	Semaphore semaphore = new Semaphore(0);
	Runnable task = () -> {
		try {
			semaphore.acquire();
			System.out.println("Finished");
			semaphore.release();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	};
	new Thread(task).start();
	Thread.sleep(5000);
	semaphore.release(1);
}
Как видим, запомнив английские слова, мы понимаем, как работает семафор. Интересно, что главное условие — на "счету" семафора должен быть положительное количество permit'ов. Поэтому, инициировать его можно и с минусом. И запрашивать (acquire) можно больше, чем 1.
Thread'ом Java не испортишь: Часть VI - К барьеру! - 3

CountDownLatch

Следующий механизм — CountDownLatch. CountDown на английском — это отсчёт, а Latch — задвижка или защёлка. То есть если переводить, то это защёлка с отсчётом. Тут нам понадобится соответствующий импорт класса java.util.concurrent.CountDownLatch. Это похоже на бега или гонки, когда все собираются у стартовой линии и когда все готовы — дают разрешение, и все одновременно стартуют. Пример:
public static void main(String[] args) {
	CountDownLatch countDownLatch = new CountDownLatch(3);
	Runnable task = () -> {
		try {
			countDownLatch.countDown();
			System.out.println("Countdown: " + countDownLatch.getCount());
			countDownLatch.await();
			System.out.println("Finished");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	};
	for (int i = 0; i < 3; i++) {
		new Thread(task).start();
 	}
}
await на английском — ожидать. То есть мы сначала говорим countDown. Как говорит гугл переводчик, count down — "an act of counting numerals in reverse order to zero", то есть выполнить действие по обратному отсчёту, цель которого — досчитать до нуля. А дальше говорим await — то есть ожидать, пока значение счётчика не станет ноль. Интересно, что такой счётчик — одноразовый. Как сказано в JavaDoc — "When threads must repeatedly count down in this way, instead use a CyclicBarrier", то есть если нужен многоразовый счёт — надо использовать другой вариант - который называется CyclicBarrier.
Thread'ом Java не испортишь: Часть VI - К барьеру! - 4

CyclicBarrier

Как и следует из названия, CyclicBarrier — это циклический барьер. Нам понадобится импорт класса java.util.concurrent.CyclicBarrier. Посмотрим на пример:
public static void main(String[] args) throws InterruptedException {
	Runnable action = () -> System.out.println("На старт!");
	CyclicBarrier berrier = new CyclicBarrier(3, action);
	Runnable task = () -> {
		try {
			berrier.await();
			System.out.println("Finished");
		} catch (BrokenBarrierException | InterruptedException e) {
			e.printStackTrace();
		}
	};
	System.out.println("Limit: " + berrier.getParties());
	for (int i = 0; i < 3; i++) {
		new Thread(task).start();
	}
}
Как видим, поток выполняет await, то есть ожидает. При этом уменьшается значение барьера. Барьер считается сломанным (berrier.isBroken()), когда отсчёт дошёл до нуля. Чтобы сбросить барьер, нужно вызвать berrier.reset(), чего не хватало в CountDownLatch.
Thread'ом Java не испортишь: Часть VI - К барьеру! - 5

Exchanger

Следующее средство — Exchanger. Exchange с английского переводится как обмен или обмениваться. А Exchanger — обменник, то есть то, через что обмениваются. Посмотрим на простейший пример:
public static void main(String[] args) {
	Exchanger<String> exchanger = new Exchanger<>();
	Runnable task = () -> {
		try {
			Thread thread = Thread.currentThread();
			String withThreadName = exchanger.exchange(thread.getName());
			System.out.println(thread.getName() + " обменялся с " + withThreadName);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	};
	new Thread(task).start();
	new Thread(task).start();
}
Тут мы запускаем два потока. Каждый из них выполняет метод exchange и ожидает, когда другой поток так жевыполнит метод exchange. Таким образом, потоки обменяются между собой переданными аргументами. Интересная штука. Ничего ли она вам не напоминает? А напоминает он SynchronousQueue, которая лежит в основе cachedThreadPool'а. Для наглядности — пример:
public static void main(String[] args) throws InterruptedException {
	SynchronousQueue<String> queue = new SynchronousQueue<>();
	Runnable task = () -> {
		try {
			System.out.println(queue.take());
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	};
	new Thread(task).start();
	queue.put("Message");
}
В примере видно, что запустив новый поток, данный поток уйдёт в ожидание, т.к. в очереди будет пусто. А дальше main поток положит в очередь текст "Message". При этом он сам остановится на время, которой нужно, пока не получат из очереди этот текстовый элемент. По этой теме так же можно почитать "SynchronousQueue Vs Exchanger".
Thread'ом Java не испортишь: Часть VI - К барьеру! - 6

Phaser

И напоследок самое сладкое — Phaser. Нам понадобится импорт класса java.util.concurrent.Phaser. Посмотрим на простой пример:
public static void main(String[] args) throws InterruptedException {
        Phaser phaser = new Phaser();
        // Вызывая метод register, мы регистрируем текущий поток (main) как участника
        phaser.register();
        System.out.println("Phasecount is " + phaser.getPhase());
        testPhaser(phaser);
        testPhaser(phaser);
        testPhaser(phaser);
        // Через 3 секунды прибываем к барьеру и снимаемся регистрацию. Кол-во прибывших = кол-во регистраций = пуск
        Thread.sleep(3000);
        phaser.arriveAndDeregister();
        System.out.println("Phasecount is " + phaser.getPhase());
    }

    private static void testPhaser(final Phaser phaser) {
        // Говорим, что будет +1 участник на Phaser
        phaser.register();
        // Запускаем новый поток
        new Thread(() -> {
            String name = Thread.currentThread().getName();
            System.out.println(name + " arrived");
            phaser.arriveAndAwaitAdvance(); //threads register arrival to the phaser.
            System.out.println(name + " after passing barrier");
        }).start();
    }
Из примера видно, что барьер при использовании Phaser'а прорывается, когда количество регистраций совпадает с количеством прибывших к барьеру. Подробнее можно ознакомиться с Phaser'ом в статье с хабра "Новый синхронизатор Phaser".
Thread'ом Java не испортишь: Часть VI - К барьеру! - 7

Итоги

Как видно из примеров, существуют различные способы синхронизации потоков. Ранее я постарался уже вспомнить что-то из многопоточности, надеюсь прошлые части были полезны. Говорят, что путь к многопоточности начинается с книги "Java Concurrency in Practice". Хотя она вышла в 2006 году, люди отвечают, что книга довольно фундаментальна и до сих пор держит удар. Например, можно прочитать обсуждения здесь: "Is Java Concurrency In Practice still valid?". Также полезно прочитать ссылки из обсуждения. Например, там есть ссылка на книгу "The Well-Grounded Java Developer", в которой стоит обратить на "Chapter 4. Modern concurrency". Есть ещё целый обзор на эту же тему: "Is Java cocurrency in pracitce still relevant in era of java 8". Там также есть советы по поводу того, что ещё следует почитать, чтобы действительно понять эту тему. После этого, можно присмотреться к такой замечательной книге, как "OCA OCP JavaSE 8 Programmer Practice Tests". Нас интересует вторая часть, то есть OCP. И там есть тесты в "Chapter 20 : Java Concurrency". В этой книге есть как вопросы, так и ответы со объяснением. Например:
Thread'ом Java не испортишь: Часть VI - К барьеру! - 8
Многие могут начать говорить, что это очередное заучивание методов. С одной стороны — да. С другой стороны, на этот вопрос можно дать ответ, вспомнив, что ExecutorService — это своего рода "апгрейд" Executor'а. А Executor призван просто скрыть способ создания потоков, но не основной способ их выполнения, то есть запуск в новом потоке Runnable. Поэтому execute(Callable) и нет, т.к. в ExecutorService к Executor'у просто добавили методы submit, которые умеют возвращать Future. Как видите, мы можем и заучить список методов, но куда проще догадаться, зная природу самих классов. Ну и немного дополнительных материалов по теме: #Viacheslav
Что еще почитать:

Thread'ом Java не испортишь: Часть I — потоки

Thread'ом Java не испортишь: Часть II — синхронизация

Thread'ом Java не испортишь : Часть III - взаимодействие

Thread'ом Java не испортишь : Часть IV - Callable, Future и друзья

Thread'ом Java не испортишь: Часть V - Executor, ThreadPool, Fork Join