Во второй части я упомянул, что map/reduce подразумевает параллельное исполнение шага map. Stream API предоставляет очень простой механизм для выполнения операций над потоком параллельно: входной поток разбивается на части, если это возможно, и каждая такая часть обрабатывается параллельно с остальными, в раздельных нитях.
Например мне нужно узнать, сколько в каждом году снималось фильмов. Для этого я скачиваю данные imdb и анализирую их:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
public class SingleStreamMovies {
private static Pattern YEAR_PATTERN = Pattern.compile(".*\\s+(\\d{4})$");
public static void main(final String[] args) throws IOException {
List<String> data = IOUtils.readLines(SingleStreamMovies.class
.getClassLoader()
.getResourceAsStream("movies.list"));
data
.stream()
.map(YEAR_PATTERN::matcher)
.filter(Matcher::matches)
.collect(Collectors.groupingBy(m -> m.group(1), Collectors.counting()));
}
}
|
В данном случае возвращается Map<String,Integer>, в которой ключ — год, а значение — количество фильмов снятых в этом году. Для получения такого результата я использую коллектор groupingBy(), у которого есть три варианта.
Первый принимает ровно один аргумент, лямбда выражение возвращающее ключ, и создаёт Map, в котором ключом будут результаты работы этого выражения, а значениями — списки всех элементов потока с одинаковым ключом. Второй вариант, который я использую для подсчёта фильмов по годам, принимаем два аргумента. Первый аргумент точно так же служит для получения значения ключа, а второй аргумент это коллектор, который получает на вход элементы с одинаковым ключом и возвращает для них какое-либо значение. Третий вариант отличается от второго возможностью указать конкретную реализацию Map.
Параллельная обработка
Из stream pipeline видно, что операции map и filter независимы и могут быть исполнены одновременно над разными элементами потока. Группирующий коллектор тоже может вызывать коллекторы для значений параллельно. Чтобы сделать stream параллельным, надо изменить буквально одну строку:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
private static Pattern YEAR_PATTERN = Pattern.compile(".*\\s+(\\d{4})$");
public static void main(final String[] args) throws IOException {
List<String> data = IOUtils.readLines(SingleStreamMovies.class
.getClassLoader()
.getResourceAsStream("movies.list"));
data
.parallelStream()
.map(YEAR_PATTERN::matcher)
.filter(Matcher::matches)
.collect(Collectors.groupingByConcurrent(m -> m.group(1), Collectors.counting()));
}
|
Вызов parallenStream() создаёт объект Stream, операции над которым могут быть выполнены, по возможности, параллельно, давая, тем самым, прирост производительности.
Коллектор groupingByConcurrent() полностью совместим с gropingBy() коллектором, но при этом дружественнен параллельной обработке.
Какой же профит? В коде примера на github есть методы по учёту времени исполнения. Однопоточной версии требуется 3.6 секунд на обработку списка фильмов, многопоточной всего лишь 2.7 секунды. Говоря другими словами, многопоточный код исполняется на 33% быстрее. Так может быть стоит все потоки создавать параллельными?
Недостатки parallelStream()
Главная проблема с parallelStream() в том, что они используют единый для всей jvm thread pool. Говоря простыми словами — если у вас есть какое-то количество потоков, обрабатывающих запросы пользователей и в каждом потоке выполняются какие-либо операции над parallelStream(), эти потоки будут вынуждены ждать друг друга. Например, первый поток обрабатывает запрос пользователя по подсчёту чего-нибудь пользовательского и занимает все нити в thread pool. Второй поток, в который придёт второй пользователь и сделает такой-же запрос, будет ждать, пока первый поток досчитает и освободит нити. И это, конечно, не то, чего бы вы хотели и чего ожидают ваши пользователи.
Вторая проблема вырастает из самой сути многопоточности — уследить за нежелательными эффектами от влияния нитей друг на друга становится ещё сложнее. Stream api не предлагает янвых средств синхронизации нитей, предполагая, что либо весь код в pipeline будет stateless, либо автор кода знает что делает. С другой стороны, ограничения на использование локальных переменных в лямбда-выражениях, о которых я писал ранее, продиктованы именно соображениями потокобезопасности.
Третья проблема в том, что параллельность обработки вообще не гарантируется и зависит от источника данных. А прирост производительности зависит от его способности корректно разделить набор данных на независимые блоки. В дальнейшем я покажу, что объекты Stream можно создавать не только из коллекций, но сейчас важно знать, что наличие в коде вызова parallelStream() не гарантирует, что обработка действительно будет выпольняться параллельно
Код примера доступен на github. Для исполнения требуется скачать IMDB movies.list и положить его в src/resources. Время исполнения на вашей системе может отличаться от времени в тексте стати.